diff mbox series

[ovs-dev,v2,4/4] northd: Add node for IGMP and Multicast data.

Message ID 20250116122532.725005-5-amusil@redhat.com
State Superseded
Headers show
Series Incremental processing of IGMP changes in northd. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Ales Musil Jan. 16, 2025, 12:25 p.m. UTC
Add new I-P node that will store all the data for IGMP and
Multicast groups. This node allows to avoid full recompute of lflow
node when IGMP or Multicast SB table changes.

The node itself still does full recompute for IGMP and Multicast
changes however this is a compromise between code complexity and
the time it takes for all lflow to be created. At the same time
thi brings the benefit of having the data available when there
is recompute of the lflow node.

As design choice there is only single lflow_ref for all IGMP
lflows, that makes them not being thread safe and only main thread
can generate them during full recompute of lflow node. This shouldn't
be an issue, because the computation of igmp lflow is pretty simple.

Reported-at: https://issues.redhat.com/browse/FDP-756
Co-authored-by: Jacob Tanenbaum <jtanenba@redhat.com>
Signed-off-by: Jacob Tanenbaum <jtanenba@redhat.com>
Suggested-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Ales Musil <amusil@redhat.com>
---
 northd/en-lflow.c        |  52 ++++++++-
 northd/en-lflow.h        |   1 +
 northd/en-multicast.c    | 223 ++++++++++++++++++++++++++++-----------
 northd/en-multicast.h    |  24 ++---
 northd/inc-proc-northd.c |  10 +-
 northd/northd.c          |  99 +++++++----------
 northd/northd.h          |  10 +-
 tests/ovn-northd.at      |  89 ++++++++++++++++
 8 files changed, 364 insertions(+), 144 deletions(-)

Comments

Mark Michelson Jan. 27, 2025, 7:06 p.m. UTC | #1
Hi Ales,

I have one finding below.

On 1/16/25 07:25, Ales Musil wrote:
> Add new I-P node that will store all the data for IGMP and
> Multicast groups. This node allows to avoid full recompute of lflow
> node when IGMP or Multicast SB table changes.
> 
> The node itself still does full recompute for IGMP and Multicast
> changes however this is a compromise between code complexity and
> the time it takes for all lflow to be created. At the same time
> thi brings the benefit of having the data available when there
> is recompute of the lflow node.
> 
> As design choice there is only single lflow_ref for all IGMP
> lflows, that makes them not being thread safe and only main thread
> can generate them during full recompute of lflow node. This shouldn't
> be an issue, because the computation of igmp lflow is pretty simple.
> 
> Reported-at: https://issues.redhat.com/browse/FDP-756
> Co-authored-by: Jacob Tanenbaum <jtanenba@redhat.com>
> Signed-off-by: Jacob Tanenbaum <jtanenba@redhat.com>
> Suggested-by: Dumitru Ceara <dceara@redhat.com>
> Signed-off-by: Ales Musil <amusil@redhat.com>
> ---
>   northd/en-lflow.c        |  52 ++++++++-
>   northd/en-lflow.h        |   1 +
>   northd/en-multicast.c    | 223 ++++++++++++++++++++++++++++-----------
>   northd/en-multicast.h    |  24 ++---
>   northd/inc-proc-northd.c |  10 +-
>   northd/northd.c          |  99 +++++++----------
>   northd/northd.h          |  10 +-
>   tests/ovn-northd.at      |  89 ++++++++++++++++
>   8 files changed, 364 insertions(+), 144 deletions(-)
> 
> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> index fa1f0236d..e2816f4da 100644
> --- a/northd/en-lflow.c
> +++ b/northd/en-lflow.c
> @@ -23,6 +23,7 @@
>   #include "en-lr-nat.h"
>   #include "en-lr-stateful.h"
>   #include "en-ls-stateful.h"
> +#include "en-multicast.h"
>   #include "en-northd.h"
>   #include "en-meters.h"
>   #include "en-sampling-app.h"
> @@ -56,13 +57,11 @@ lflow_get_input_data(struct engine_node *node,
>           engine_get_input_data("lr_stateful", node);
>       struct ed_type_ls_stateful *ls_stateful_data =
>           engine_get_input_data("ls_stateful", node);
> +    struct multicast_igmp_data *multicat_igmp_data =
> +        engine_get_input_data("multicast_igmp", node);
>   
>       lflow_input->sbrec_logical_flow_table =
>           EN_OVSDB_GET(engine_get_input("SB_logical_flow", node));
> -    lflow_input->sbrec_multicast_group_table =
> -        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
> -    lflow_input->sbrec_igmp_group_table =
> -        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
>       lflow_input->sbrec_logical_dp_group_table =
>           EN_OVSDB_GET(engine_get_input("SB_logical_dp_group", node));
>   
> @@ -85,6 +84,8 @@ lflow_get_input_data(struct engine_node *node,
>       lflow_input->parsed_routes = &routes_data->parsed_routes;
>       lflow_input->route_tables = &routes_data->route_tables;
>       lflow_input->route_policies = &route_policies_data->route_policies;
> +    lflow_input->igmp_groups = &multicat_igmp_data->igmp_groups;
> +    lflow_input->igmp_lflow_ref = multicat_igmp_data->lflow_ref;
>   
>       struct ed_type_global_config *global_config =
>           engine_get_input_data("global_config", node);
> @@ -110,6 +111,7 @@ void en_lflow_run(struct engine_node *node, void *data)
>       struct lflow_data *lflow_data = data;
>       lflow_table_clear(lflow_data->lflow_table);
>       lflow_reset_northd_refs(&lflow_input);
> +    lflow_ref_clear(lflow_input.igmp_lflow_ref);
>   
>       build_lflows(eng_ctx->ovnsb_idl_txn, &lflow_input,
>                    lflow_data->lflow_table);
> @@ -219,6 +221,48 @@ lflow_ls_stateful_handler(struct engine_node *node, void *data)
>       return true;
>   }
>   
> +bool
> +lflow_multicast_igmp_handler(struct engine_node *node, void *data)
> +{
> +    struct multicast_igmp_data *mcast_igmp_data =
> +        engine_get_input_data("multicast_igmp", node);
> +
> +    const struct engine_context *eng_ctx = engine_get_context();
> +    struct lflow_data *lflow_data = data;
> +    struct lflow_input lflow_input;
> +    lflow_get_input_data(node, &lflow_input);
> +
> +    if (!lflow_ref_resync_flows(mcast_igmp_data->lflow_ref,
> +                                lflow_data->lflow_table,
> +                                eng_ctx->ovnsb_idl_txn,
> +                                lflow_input.ls_datapaths,
> +                                lflow_input.lr_datapaths,
> +                                lflow_input.ovn_internal_version_changed,
> +                                lflow_input.sbrec_logical_flow_table,
> +                                lflow_input.sbrec_logical_dp_group_table)) {
> +        return false;
> +    }
> +
> +    build_igmp_lflows(&mcast_igmp_data->igmp_groups,
> +                      &lflow_input.ls_datapaths->datapaths,
> +                      lflow_data->lflow_table,
> +                      mcast_igmp_data->lflow_ref);
> +
> +    if (!lflow_ref_sync_lflows(mcast_igmp_data->lflow_ref,
> +                               lflow_data->lflow_table,
> +                               eng_ctx->ovnsb_idl_txn,
> +                               lflow_input.ls_datapaths,
> +                               lflow_input.lr_datapaths,
> +                               lflow_input.ovn_internal_version_changed,
> +                               lflow_input.sbrec_logical_flow_table,
> +                               lflow_input.sbrec_logical_dp_group_table)) {
> +        return false;
> +    }
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
>   void *en_lflow_init(struct engine_node *node OVS_UNUSED,
>                        struct engine_arg *arg OVS_UNUSED)
>   {
> diff --git a/northd/en-lflow.h b/northd/en-lflow.h
> index 32cae6176..f90f5c61c 100644
> --- a/northd/en-lflow.h
> +++ b/northd/en-lflow.h
> @@ -22,5 +22,6 @@ bool lflow_northd_handler(struct engine_node *, void *data);
>   bool lflow_port_group_handler(struct engine_node *, void *data);
>   bool lflow_lr_stateful_handler(struct engine_node *, void *data);
>   bool lflow_ls_stateful_handler(struct engine_node *node, void *data);
> +bool lflow_multicast_igmp_handler(struct engine_node *node, void *data);
>   
>   #endif /* EN_LFLOW_H */
> diff --git a/northd/en-multicast.c b/northd/en-multicast.c
> index 0f07cf2fe..59a36f38b 100644
> --- a/northd/en-multicast.c
> +++ b/northd/en-multicast.c
> @@ -22,6 +22,7 @@
>   
>   /* OVN includes. */
>   #include "en-multicast.h"
> +#include "lflow-mgr.h"
>   #include "lib/ip-mcast-index.h"
>   #include "lib/mcast-group-index.h"
>   #include "lib/ovn-l7.h"
> @@ -47,6 +48,16 @@ static const struct multicast_group mc_unknown =
>   static const struct multicast_group mc_flood_l2 =
>       { MC_FLOOD_L2, OVN_MCAST_FLOOD_L2_TUNNEL_KEY };
>   
> +static void build_mcast_groups(
> +    struct multicast_igmp_data *, const struct sbrec_igmp_group_table *,
> +    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
> +    const struct hmap *ls_datapaths, const struct hmap *ls_ports,
> +    const struct hmap *lr_ports);
> +static void sync_multicast_groups_to_sb(
> +    struct multicast_igmp_data *, struct ovsdb_idl_txn *,
> +    const struct sbrec_multicast_group_table *,
> +    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths);
> +
>   static bool multicast_group_equal(const struct multicast_group *,
>                                     const struct multicast_group *);
>   static uint32_t ovn_multicast_hash(const struct ovn_datapath *,
> @@ -65,6 +76,7 @@ static void ovn_multicast_destroy(struct hmap *mcgroups,
>                                     struct ovn_multicast *);
>   static void ovn_multicast_update_sbrec(const struct ovn_multicast *,
>                                          const struct sbrec_multicast_group *);
> +static void ovn_multicast_groups_destroy(struct hmap *mcast_groups);
>   
>   static uint32_t ovn_igmp_group_hash(const struct ovn_datapath *,
>                                       const struct in6_addr *);
> @@ -88,29 +100,123 @@ static void ovn_igmp_group_aggregate_ports(struct ovn_igmp_group *,
>                                              struct hmap *mcast_groups);
>   static void ovn_igmp_group_destroy(struct hmap *igmp_groups,
>                                      struct ovn_igmp_group *);
> +static void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
> +
> +void *
> +en_multicast_igmp_init(struct engine_node *node OVS_UNUSED,
> +                       struct engine_arg *arg OVS_UNUSED)
> +{
> +    struct multicast_igmp_data *data =xmalloc(sizeof *data);
> +    hmap_init(&data->mcast_groups);
> +    hmap_init(&data->igmp_groups);
> +    data->lflow_ref = lflow_ref_create();
> +
> +    return data;
> +}
>   
>   void
> -build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
> +en_multicast_igmp_run(struct engine_node *node, void *data_)
> +{
> +    struct multicast_igmp_data *data = data_;
> +    struct northd_data *northd_data = engine_get_input_data("northd", node);
> +    const struct sbrec_igmp_group_table *sbrec_igmp_group_table =
> +        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
> +    const struct sbrec_multicast_group_table *sbrec_multicast_group_table =
> +        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
> +    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp =
> +        engine_ovsdb_node_get_index(
> +            engine_get_input("SB_multicast_group", node),
> +            "sbrec_mcast_group_by_name");
> +    const struct engine_context *eng_ctx = engine_get_context();
> +
> +    ovn_multicast_groups_destroy(&data->mcast_groups);
> +    ovn_igmp_groups_destroy(&data->igmp_groups);
> +
> +    build_mcast_groups(data, sbrec_igmp_group_table,
> +                      sbrec_mcast_group_by_name_dp,
> +                      &northd_data->ls_datapaths.datapaths,
> +                      &northd_data->ls_ports,
> +                      &northd_data->lr_ports);
> +    sync_multicast_groups_to_sb(data, eng_ctx->ovnsb_idl_txn,
> +                                sbrec_multicast_group_table,
> +                                &northd_data->ls_datapaths.datapaths,
> +                                &northd_data->lr_datapaths.datapaths);
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +}
> +
> +bool
> +multicast_igmp_northd_handler(struct engine_node *node, void *data OVS_UNUSED)
> +{
> +    struct northd_data *northd_data = engine_get_input_data("northd", node);
> +    if (!northd_has_tracked_data(&northd_data->trk_data)) {
> +        return false;
> +    }
> +
> +    /* This node uses the below data from the en_northd engine node.
> +     *      - northd_data->lr_datapaths
> +     *      - northd_data->ls_ports
> +     *      - northd_data->lr_ports
> +     *
> +     *      This data gets updated when a logical router is created or deleted.
> +     *      northd engine node presently falls back to full recompute when
> +     *      this happens and so does this node.
> +     *      Note: When we add I-P to the created/deleted logical routers, we
> +     *      need to revisit this handler.
> +     *
> +     *      This node also accesses the router ports of the logical router
> +     *      (od->ports).  When these logical router ports gets updated,
> +     *      en_northd engine recomputes and so does this node.
> +     *      Note: When we add I-P to handle switch/router port changes, we
> +     *      need to revisit this handler.
> +     *
> +     * */
> +    return true;
> +}
> +
> +void
> +en_multicast_igmp_cleanup(void *data_)
> +{
> +    struct multicast_igmp_data *data = data_;
> +
> +    ovn_multicast_groups_destroy(&data->mcast_groups);
> +    ovn_igmp_groups_destroy(&data->igmp_groups);
> +    hmap_destroy(&data->mcast_groups);
> +    hmap_destroy(&data->igmp_groups);
> +    lflow_ref_destroy(data->lflow_ref);
> +}
> +
> +struct sbrec_multicast_group *
> +create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
> +                          const struct sbrec_datapath_binding *dp,
> +                          const char *name,
> +                          int64_t tunnel_key)
> +{
> +    struct sbrec_multicast_group *sbmc =
> +        sbrec_multicast_group_insert(ovnsb_txn);
> +    sbrec_multicast_group_set_datapath(sbmc, dp);
> +    sbrec_multicast_group_set_name(sbmc, name);
> +    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
> +    return sbmc;
> +}
> +
> +static void
> +build_mcast_groups(struct multicast_igmp_data *data,
> +                   const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>                      struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
>                      const struct hmap *ls_datapaths,
>                      const struct hmap *ls_ports,
> -                   const struct hmap *lr_ports,
> -                   struct hmap *mcast_groups,
> -                   struct hmap *igmp_groups)
> -{
> +                   const struct hmap *lr_ports) {
>       struct ovn_datapath *od;
>       struct ovn_port *op;
>   
> -    hmap_init(mcast_groups);
> -    hmap_init(igmp_groups);
> -
>       HMAP_FOR_EACH (op, key_node, lr_ports) {
>           if (lrport_is_enabled(op->nbrp)) {
>               /* If this port is configured to always flood multicast traffic
>                * add it to the MC_STATIC group.
>                */
>               if (op->mcast_info.flood) {
> -                ovn_multicast_add(mcast_groups, &mc_static, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
>                   op->od->mcast_info.rtr.flood_static = true;
>               }
>           }
> @@ -118,14 +224,14 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>   
>       HMAP_FOR_EACH (op, key_node, ls_ports) {
>           if (lsp_is_enabled(op->nbsp)) {
> -            ovn_multicast_add(mcast_groups, &mc_flood, op);
> +            ovn_multicast_add(&data->mcast_groups, &mc_flood, op);
>   
>               if (!lsp_is_router(op->nbsp)) {
> -                ovn_multicast_add(mcast_groups, &mc_flood_l2, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_flood_l2, op);
>               }
>   
>               if (op->has_unknown) {
> -                ovn_multicast_add(mcast_groups, &mc_unknown, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_unknown, op);
>               }
>   
>               /* If this port is connected to a multicast router then add it
> @@ -133,7 +239,7 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>                */
>               if (op->od->mcast_info.sw.flood_relay && op->peer &&
>                   op->peer->od && op->peer->od->mcast_info.rtr.relay) {
> -                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_mrouter_flood, op);
>               }
>   
>               /* If this port is configured to always flood multicast reports
> @@ -141,7 +247,7 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>                * flooded to statically configured or learned mrouters).
>                */
>               if (op->mcast_info.flood_reports) {
> -                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_mrouter_flood, op);
>                   op->od->mcast_info.sw.flood_reports = true;
>               }
>   
> @@ -149,7 +255,7 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>                * add it to the MC_STATIC group.
>                */
>               if (op->mcast_info.flood) {
> -                ovn_multicast_add(mcast_groups, &mc_static, op);
> +                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
>                   op->od->mcast_info.sw.flood_static = true;
>               }
>           }
> @@ -202,7 +308,8 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>            * if the multicast group already exists.
>            */
>           struct ovn_igmp_group *igmp_group =
> -            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp, igmp_groups, od,
> +            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
> +                               &data->igmp_groups, od,
>                                  &group_address, sb_igmp->address);
>   
>           /* Add the extracted ports to the IGMP group. */
> @@ -240,7 +347,7 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>   
>               struct ovn_igmp_group *igmp_group_rtr =
>                   ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
> -                                   igmp_groups, router_port->od,
> +                                   &data->igmp_groups, router_port->od,
>                                      &group_address, igmp_group->mcgroup.name);
>               struct ovn_port **router_igmp_ports =
>                   xmalloc(sizeof *router_igmp_ports);
> @@ -260,41 +367,41 @@ build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
>        * explicitly.
>        */
>       struct ovn_igmp_group *igmp_group;
> -    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> +    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, &data->igmp_groups) {
>   
>           /* If this is a mrouter entry just aggregate the mrouter ports
>            * into the MC_MROUTER mcast_group and destroy the igmp_group;
>            * no more processing needed. */
>           if (!strcmp(igmp_group->mcgroup.name, OVN_IGMP_GROUP_MROUTERS)) {
> -            ovn_igmp_mrouter_aggregate_ports(igmp_group, mcast_groups);
> -            ovn_igmp_group_destroy(igmp_groups, igmp_group);
> +            ovn_igmp_mrouter_aggregate_ports(igmp_group, &data->mcast_groups);
> +            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
>               continue;
>           }
>   
>           if (!ovn_igmp_group_allocate_id(igmp_group)) {
>               /* If we ran out of keys just destroy the entry. */
> -            ovn_igmp_group_destroy(igmp_groups, igmp_group);
> +            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
>               continue;
>           }
>   
>           /* Aggregate the ports from all entries corresponding to this
>            * group.
>            */
> -        ovn_igmp_group_aggregate_ports(igmp_group, mcast_groups);
> +        ovn_igmp_group_aggregate_ports(igmp_group, &data->mcast_groups);
>       }
>   }
>   
> -void
> +static void
>   sync_multicast_groups_to_sb(
> -    struct ovsdb_idl_txn *ovnsb_txn,
> +    struct multicast_igmp_data *data, struct ovsdb_idl_txn *ovnsb_txn,
>       const struct sbrec_multicast_group_table *sbrec_multicast_group_table,
> -    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
> -    struct hmap *mcast_groups)
> +    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths)
>   {
> +    struct hmapx mcast_in_sb = HMAPX_INITIALIZER(&mcast_in_sb);
> +
>       /* Push changes to the Multicast_Group table to database. */
>       const struct sbrec_multicast_group *sbmc;
> -    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH_SAFE (
> -        sbmc, sbrec_multicast_group_table) {
> +    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH (sbmc, sbrec_multicast_group_table) {
>           struct ovn_datapath *od = ovn_datapath_from_sbrec(ls_datapaths,
>                                                             lr_datapaths,
>                                                             sbmc->datapath);

I think this needs to continue using the "_SAFE" variant because there 
is a condition within the loop where sbrec_multicast_group_delete() is 
called.

> @@ -306,55 +413,34 @@ sync_multicast_groups_to_sb(
>   
>           struct multicast_group group = { .name = sbmc->name,
>               .key = sbmc->tunnel_key };
> -        struct ovn_multicast *mc = ovn_multicast_find(mcast_groups,
> +        struct ovn_multicast *mc = ovn_multicast_find(&data->mcast_groups,
>                                                         od, &group);
>           if (mc) {
>               ovn_multicast_update_sbrec(mc, sbmc);
> -            ovn_multicast_destroy(mcast_groups, mc);
> +            hmapx_add(&mcast_in_sb, mc);
>           } else {
>               sbrec_multicast_group_delete(sbmc);

Right here ^^

>           }
>       }
>       struct ovn_multicast *mc;
> -    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
> +    HMAP_FOR_EACH_SAFE (mc, hmap_node, &data->mcast_groups) {
>           if (!mc->datapath) {
> -            ovn_multicast_destroy(mcast_groups, mc);
> +            ovn_multicast_destroy(&data->mcast_groups, mc);
> +            continue;
> +        }
> +
> +        if (hmapx_contains(&mcast_in_sb, mc)) {
>               continue;
>           }
> +
>           sbmc = create_sb_multicast_group(ovnsb_txn, mc->datapath->sb,
>                                            mc->group->name, mc->group->key);
>           ovn_multicast_update_sbrec(mc, sbmc);
> -        ovn_multicast_destroy(mcast_groups, mc);
>       }
>   
> -    hmap_destroy(mcast_groups);
> -}
> -
> -void
> -ovn_igmp_groups_destroy(struct hmap *igmp_groups)
> -{
> -    struct ovn_igmp_group *igmp_group;
> -    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> -        ovn_igmp_group_destroy(igmp_groups, igmp_group);
> -    }
> -    hmap_destroy(igmp_groups);
> +    hmapx_destroy(&mcast_in_sb);
>   }
>   
> -struct sbrec_multicast_group *
> -create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
> -                          const struct sbrec_datapath_binding *dp,
> -                          const char *name,
> -                          int64_t tunnel_key)
> -{
> -    struct sbrec_multicast_group *sbmc =
> -        sbrec_multicast_group_insert(ovnsb_txn);
> -    sbrec_multicast_group_set_datapath(sbmc, dp);
> -    sbrec_multicast_group_set_name(sbmc, name);
> -    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
> -    return sbmc;
> -}
> -
> -
>   static bool
>   multicast_group_equal(const struct multicast_group *a,
>                         const struct multicast_group *b)
> @@ -362,7 +448,6 @@ multicast_group_equal(const struct multicast_group *a,
>       return !strcmp(a->name, b->name) && a->key == b->key;
>   }
>   
> -
>   static uint32_t
>   ovn_multicast_hash(const struct ovn_datapath *datapath,
>                      const struct multicast_group *group)
> @@ -452,6 +537,15 @@ ovn_multicast_update_sbrec(const struct ovn_multicast *mc,
>       free(ports);
>   }
>   
> +static void
> +ovn_multicast_groups_destroy(struct hmap *mcast_groups)
> +{
> +    struct ovn_multicast *mc;
> +    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
> +        ovn_multicast_destroy(mcast_groups, mc);
> +    }
> +}
> +
>   static uint32_t
>   ovn_igmp_group_hash(const struct ovn_datapath *datapath,
>                       const struct in6_addr *address)
> @@ -644,3 +738,12 @@ ovn_igmp_group_destroy(struct hmap *igmp_groups,
>           free(igmp_group);
>       }
>   }
> +
> +static void
> +ovn_igmp_groups_destroy(struct hmap *igmp_groups)
> +{
> +    struct ovn_igmp_group *igmp_group;
> +    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> +        ovn_igmp_group_destroy(igmp_groups, igmp_group);
> +    }
> +}
> diff --git a/northd/en-multicast.h b/northd/en-multicast.h
> index 9a6848f78..3932b4b08 100644
> --- a/northd/en-multicast.h
> +++ b/northd/en-multicast.h
> @@ -70,20 +70,16 @@ struct ovn_igmp_group {
>       struct ovs_list entries; /* List of SB entries for this group. */
>   };
>   
> -void build_mcast_groups(
> -    const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
> -    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
> -    const struct hmap *ls_datapaths,
> -    const struct hmap *ls_ports,
> -    const struct hmap *lr_ports,
> -    struct hmap *mcast_groups,
> -    struct hmap *igmp_groups);
> -void sync_multicast_groups_to_sb(
> -    struct ovsdb_idl_txn *ovnsb_txn,
> -    const struct sbrec_multicast_group_table *sbrec_multicast_group_table,
> -    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
> -    struct hmap *mcast_groups);
> -void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
> +struct multicast_igmp_data {
> +    struct hmap mcast_groups;
> +    struct hmap igmp_groups;
> +    struct lflow_ref *lflow_ref;
> +};
> +
> +void *en_multicast_igmp_init(struct engine_node *,struct engine_arg *);
> +void en_multicast_igmp_run(struct engine_node *, void *);
> +bool multicast_igmp_northd_handler(struct engine_node *, void *);
> +void en_multicast_igmp_cleanup(void *);
>   struct sbrec_multicast_group *create_sb_multicast_group(
>       struct ovsdb_idl_txn *ovnsb_txn, const struct sbrec_datapath_binding *,
>       const char *name, int64_t tunnel_key);
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index 6e0aa04c4..a9990974c 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -34,6 +34,7 @@
>   #include "en-lr-stateful.h"
>   #include "en-lr-nat.h"
>   #include "en-ls-stateful.h"
> +#include "en-multicast.h"
>   #include "en-northd.h"
>   #include "en-lflow.h"
>   #include "en-northd-output.h"
> @@ -161,6 +162,7 @@ static ENGINE_NODE(route_policies, "route_policies");
>   static ENGINE_NODE(routes, "routes");
>   static ENGINE_NODE(bfd, "bfd");
>   static ENGINE_NODE(bfd_sync, "bfd_sync");
> +static ENGINE_NODE(multicast_igmp, "multicast_igmp");
>   
>   void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>                             struct ovsdb_idl_loop *sb)
> @@ -267,11 +269,15 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>       engine_add_input(&en_sync_meters, &en_nb_meter, NULL);
>       engine_add_input(&en_sync_meters, &en_sb_meter, NULL);
>   
> +    engine_add_input(&en_multicast_igmp, &en_northd,
> +                     multicast_igmp_northd_handler);
> +    engine_add_input(&en_multicast_igmp, &en_sb_multicast_group, NULL);
> +    engine_add_input(&en_multicast_igmp, &en_sb_igmp_group, NULL);
> +
>       engine_add_input(&en_lflow, &en_nb_acl, NULL);
>       engine_add_input(&en_lflow, &en_sync_meters, NULL);
>       engine_add_input(&en_lflow, &en_sb_logical_flow, NULL);
>       engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
> -    engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>       engine_add_input(&en_lflow, &en_sb_logical_dp_group, NULL);
>       engine_add_input(&en_lflow, &en_bfd_sync, NULL);
>       engine_add_input(&en_lflow, &en_route_policies, NULL);
> @@ -285,6 +291,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>       engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
>       engine_add_input(&en_lflow, &en_lr_stateful, lflow_lr_stateful_handler);
>       engine_add_input(&en_lflow, &en_ls_stateful, lflow_ls_stateful_handler);
> +    engine_add_input(&en_lflow, &en_multicast_igmp,
> +                     lflow_multicast_igmp_handler);
>   
>       engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>       engine_add_input(&en_sync_to_sb_addr_set, &en_lr_stateful, NULL);
> diff --git a/northd/northd.c b/northd/northd.c
> index 905f19ff1..4172f3396 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -16980,7 +16980,6 @@ struct lswitch_flow_build_info {
>       const struct lr_stateful_table *lr_stateful_table;
>       const struct ls_stateful_table *ls_stateful_table;
>       struct lflow_table *lflows;
> -    struct hmap *igmp_groups;
>       const struct shash *meter_groups;
>       const struct hmap *lb_dps_map;
>       const struct hmap *svc_monitor_map;
> @@ -17145,7 +17144,6 @@ build_lflows_thread(void *arg)
>       const struct lr_stateful_record *lr_stateful_rec;
>       const struct ls_stateful_record *ls_stateful_rec;
>       struct lswitch_flow_build_info *lsi;
> -    struct ovn_igmp_group *igmp_group;
>       struct ovn_lb_datapaths *lb_dps;
>       struct ovn_datapath *od;
>       struct ovn_port *op;
> @@ -17296,27 +17294,6 @@ build_lflows_thread(void *arg)
>                   }
>               }
>   
> -            for (bnum = control->id;
> -                    bnum <= lsi->igmp_groups->mask;
> -                    bnum += control->pool->size)
> -            {
> -                HMAP_FOR_EACH_IN_PARALLEL (
> -                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> -                    if (stop_parallel_processing()) {
> -                        return NULL;
> -                    }
> -                    if (igmp_group->datapath->nbs) {
> -                        build_lswitch_ip_mcast_igmp_mld(igmp_group,
> -                                                        lsi->lflows,
> -                                                        &lsi->actions,
> -                                                        &lsi->match, NULL);
> -                    } else {
> -                        build_igmp_flows_for_lrouter(igmp_group, lsi->lflows,
> -                                                     &lsi->actions,
> -                                                     &lsi->match, NULL);
> -                    }
> -                }
> -            }
>               lsi->thread_lflow_counter = thread_lflow_counter;
>           }
>           post_completed_work(control);
> @@ -17366,7 +17343,6 @@ build_lswitch_and_lrouter_flows(
>       const struct lr_stateful_table *lr_stateful_table,
>       const struct ls_stateful_table *ls_stateful_table,
>       struct lflow_table *lflows,
> -    struct hmap *igmp_groups,
>       const struct shash *meter_groups,
>       const struct hmap *lb_dps_map,
>       const struct hmap *svc_monitor_map,
> @@ -17401,7 +17377,6 @@ build_lswitch_and_lrouter_flows(
>               lsiv[index].ls_port_groups = ls_pgs;
>               lsiv[index].lr_stateful_table = lr_stateful_table;
>               lsiv[index].ls_stateful_table = ls_stateful_table;
> -            lsiv[index].igmp_groups = igmp_groups;
>               lsiv[index].meter_groups = meter_groups;
>               lsiv[index].lb_dps_map = lb_dps_map;
>               lsiv[index].svc_monitor_map = svc_monitor_map;
> @@ -17432,7 +17407,6 @@ build_lswitch_and_lrouter_flows(
>       } else {
>           const struct lr_stateful_record *lr_stateful_rec;
>           const struct ls_stateful_record *ls_stateful_rec;
> -        struct ovn_igmp_group *igmp_group;
>           struct ovn_lb_datapaths *lb_dps;
>           struct ovn_datapath *od;
>           struct ovn_port *op;
> @@ -17446,7 +17420,6 @@ build_lswitch_and_lrouter_flows(
>               .lr_stateful_table = lr_stateful_table,
>               .ls_stateful_table = ls_stateful_table,
>               .lflows = lflows,
> -            .igmp_groups = igmp_groups,
>               .meter_groups = meter_groups,
>               .lb_dps_map = lb_dps_map,
>               .svc_monitor_map = svc_monitor_map,
> @@ -17535,19 +17508,6 @@ build_lswitch_and_lrouter_flows(
>                                       lsi.features,
>                                       lsi.lflows);
>           }
> -        stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
> -        stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> -        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> -            if (igmp_group->datapath->nbs) {
> -                build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi.lflows,
> -                                                &lsi.actions, &lsi.match,
> -                                                NULL);
> -            } else {
> -                build_igmp_flows_for_lrouter(igmp_group, lsi.lflows,
> -                                             &lsi.actions, &lsi.match, NULL);
> -            }
> -        }
> -        stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
>   
>           ds_destroy(&lsi.match);
>           ds_destroy(&lsi.actions);
> @@ -17556,6 +17516,40 @@ build_lswitch_and_lrouter_flows(
>       free(svc_check_match);
>   }
>   
> +/* The IGMP flows have to be built in main thread because there is
> + * single lflow_ref for all of them which isn't thread safe.
> + * This shouldn't affect performance as there is a limited how many
> + * IGMP groups can be created. */
> +void
> +build_igmp_lflows(struct hmap *igmp_groups, const struct hmap *ls_datapaths,
> +                  struct lflow_table *lflows, struct lflow_ref *lflow_ref)
> +{
> +    struct ds actions = DS_EMPTY_INITIALIZER;
> +    struct ds match = DS_EMPTY_INITIALIZER;
> +
> +    struct ovn_datapath *od;
> +    HMAP_FOR_EACH (od, key_node, ls_datapaths) {
> +        init_mcast_flow_count(od);
> +        build_mcast_flood_lswitch(od, lflows, &actions, lflow_ref);
> +    }
> +
> +    stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> +    struct ovn_igmp_group *igmp_group;
> +    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> +        if (igmp_group->datapath->nbs) {
> +            build_lswitch_ip_mcast_igmp_mld(igmp_group, lflows, &actions,
> +                                            &match, lflow_ref);
> +        } else {
> +            build_igmp_flows_for_lrouter(igmp_group, lflows, &actions,
> +                                         &match, lflow_ref);
> +        }
> +    }
> +    stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> +
> +    ds_destroy(&actions);
> +    ds_destroy(&match);
> +}
> +
>   void run_update_worker_pool(int n_threads)
>   {
>       /* If number of threads has been updated (or initially set),
> @@ -17580,20 +17574,6 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
>                     struct lflow_input *input_data,
>                     struct lflow_table *lflows)
>   {
> -    struct hmap mcast_groups;
> -    struct hmap igmp_groups;
> -
> -    struct ovn_datapath *od;
> -    HMAP_FOR_EACH (od, key_node, &input_data->ls_datapaths->datapaths) {
> -        init_mcast_flow_count(od);
> -    }
> -
> -    build_mcast_groups(input_data->sbrec_igmp_group_table,
> -                       input_data->sbrec_mcast_group_by_name_dp,
> -                       &input_data->ls_datapaths->datapaths,
> -                       input_data->ls_ports, input_data->lr_ports,
> -                       &mcast_groups, &igmp_groups);
> -
>       build_lswitch_and_lrouter_flows(input_data->ls_datapaths,
>                                       input_data->lr_datapaths,
>                                       input_data->ls_ports,
> @@ -17602,7 +17582,6 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
>                                       input_data->lr_stateful_table,
>                                       input_data->ls_stateful_table,
>                                       lflows,
> -                                    &igmp_groups,
>                                       input_data->meter_groups,
>                                       input_data->lb_datapaths_map,
>                                       input_data->svc_monitor_map,
> @@ -17613,6 +17592,9 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
>                                       input_data->parsed_routes,
>                                       input_data->route_policies,
>                                       input_data->route_tables);
> +    build_igmp_lflows(input_data->igmp_groups,
> +                      &input_data->ls_datapaths->datapaths,
> +                      lflows, input_data->igmp_lflow_ref);
>   
>       if (parallelization_state == STATE_INIT_HASH_SIZES) {
>           parallelization_state = STATE_USE_PARALLELIZATION;
> @@ -17630,13 +17612,6 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
>                              input_data->sbrec_logical_dp_group_table);
>   
>       stopwatch_stop(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
> -
> -    sync_multicast_groups_to_sb(ovnsb_txn,
> -                                input_data->sbrec_multicast_group_table,
> -                                &input_data->ls_datapaths->datapaths,
> -                                &input_data->lr_datapaths->datapaths,
> -                                &mcast_groups);
> -    ovn_igmp_groups_destroy(&igmp_groups);
>   }
>   
>   void
> diff --git a/northd/northd.h b/northd/northd.h
> index 218f0f62d..6bca6bb1a 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -198,13 +198,12 @@ struct bfd_sync_data {
>       struct sset bfd_ports;
>   };
>   
> +struct lflow_ref;
>   struct lr_nat_table;
>   
>   struct lflow_input {
>       /* Southbound table references */
>       const struct sbrec_logical_flow_table *sbrec_logical_flow_table;
> -    const struct sbrec_multicast_group_table *sbrec_multicast_group_table;
> -    const struct sbrec_igmp_group_table *sbrec_igmp_group_table;
>       const struct sbrec_logical_dp_group_table *sbrec_logical_dp_group_table;
>   
>       /* Indexes */
> @@ -228,6 +227,8 @@ struct lflow_input {
>       struct hmap *parsed_routes;
>       struct hmap *route_policies;
>       struct simap *route_tables;
> +    struct hmap *igmp_groups;
> +    struct lflow_ref *igmp_lflow_ref;
>   };
>   
>   extern int parallelization_state;
> @@ -896,5 +897,8 @@ lsp_is_router(const struct nbrec_logical_switch_port *nbsp)
>   }
>   
>   struct ovn_port *ovn_port_find(const struct hmap *ports, const char *name);
> -
> +void build_igmp_lflows(struct hmap *igmp_groups,
> +                       const struct hmap *ls_datapaths,
> +                       struct lflow_table *lflows,
> +                       struct lflow_ref *lflow_ref);
>   #endif /* NORTHD_H */
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 507cc302f..91ba5b736 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -14384,3 +14384,92 @@ AT_CHECK([ovn-sbctl lflow-list S1 | grep ls_out_acl_action | grep priority=500 |
>   
>   AT_CLEANUP
>   ])
> +
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([IGMP incremental processing])
> +
> +check_recompute_counter() {
> +    lflow_recomp=$(as northd ovn-appctl -t ovn-northd inc-engine/show-stats lflow recompute)
> +    AT_CHECK([test x$lflow_recomp = x$1])
> +}
> +ovn_start
> +
> +net_add n1
> +sim_add hv1
> +as hv1
> +
> +ovs-vsctl add-br br-phys
> +ovn_attach n1 br-phys 192.168.0.11
> +
> +sim_add hv2
> +as hv2
> +
> +check ovs-vsctl add-br br-phys
> +ovn_attach n1 br-phys 192.168.0.2
> +
> +check ovn-nbctl ls-add sw1
> +check ovn-nbctl ls-add sw2
> +
> +check ovn-nbctl lsp-add sw1 sw1-p11
> +check ovn-nbctl lsp-add sw2 sw2-p21
> +
> +check ovn-nbctl lr-add rtr
> +check ovn-nbctl lrp-add rtr rtr-sw1 00:00:00:00:01:00 10.0.0.254/24
> +check ovn-nbctl lrp-add rtr rtr-sw2 00:00:00:00:02:00 10.0.0.254/24
> +
> +
> +check ovn-nbctl lsp-add sw1 sw1-rtr \
> +    -- lsp-set-type sw1-rtr router  \
> +    -- lsp-set-addresses sw1-rtr 00:00:00:00:01:00 \
> +    -- lsp-set-options sw1-rtr router-port=rtr-sw1
> +
> +check ovn-nbctl lsp-add sw2 sw2-rtr \
> +    -- lsp-set-type sw2-rtr router  \
> +    -- lsp-set-addresses sw1-rtr 00:00:00:00:02:00 \
> +    -- lsp-set-options sw2-rtr router-port=rtr-sw2
> +
> +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> +
> +# Create IGMP_Group 239.0.1.68 with port sw1-p11
> +ovn-sbctl create IGMP_Group address=239.0.1.68 \
> +    datapath=$(fetch_column Datapath_Binding _uuid external_ids:name=sw1) \
> +    chassis=$(fetch_column Chassis _uuid name=hv1) \
> +    chassis_name=hv1 \
> +    ports=$(fetch_column Port_Binding _uuid logical_port=sw1-p11)
> +igmp_uuid=$(fetch_column IGMP_GROUP _uuid address=239.0.1.68)
> +
> +check ovn-nbctl --wait=sb sync
> +wait_row_count Igmp_Group 1 address=239.0.1.68
> +wait_row_count Multicast_Group 1 name="239.0.1.68"
> +wait_row_count Multicast_Group  1 name="239.0.1.68" ports='[['$(fetch_column Port_Binding _uuid logical_port=sw1-p11)']]'
> +ovn-sbctl list igmp_group
> +check_recompute_counter 0
> +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> +
> +check ovn-nbctl set logical_router rtr \
> +    options:mcast_relay="true"
> +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> +# Update IGMP_Group 239.0.1.68 to include sw2-p21
> +ovn-sbctl add IGMP_Group $igmp_uuid ports $(fetch_column Port_Binding _uuid logical_port=sw2-p21)
> +
> +check ovn-nbctl --wait=sb sync
> +wait_row_count IGMP_Group 1 address=239.0.1.68
> +
> +# Check that new Multicast_Group is created
> +wait_row_count Multicast_Group 2 name=239.0.1.68
> +check_recompute_counter 0
> +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> +
> +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> +# Delete IGMP_Group 239.0.1.68
> +ovn-sbctl destroy IGMP_Group $igmp_uuid
> +check ovn-nbctl --wait=sb sync
> +check_recompute_counter 0
> +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> +
> +wait_row_count IGMP_Group 0 address=239.0.1.68
> +wait_row_count Multicast_Group 0 name=239.0.1.68
> +
> +OVN_CLEANUP([hv1], [hv2])
> +AT_CLEANUP
> +])
Ales Musil Jan. 28, 2025, 6:38 a.m. UTC | #2
On Mon, Jan 27, 2025 at 8:06 PM Mark Michelson <mmichels@redhat.com> wrote:

> Hi Ales,
>
> I have one finding below.
>


Hi Mark,

thank you for the review.


> On 1/16/25 07:25, Ales Musil wrote:
> > Add new I-P node that will store all the data for IGMP and
> > Multicast groups. This node allows to avoid full recompute of lflow
> > node when IGMP or Multicast SB table changes.
> >
> > The node itself still does full recompute for IGMP and Multicast
> > changes however this is a compromise between code complexity and
> > the time it takes for all lflow to be created. At the same time
> > thi brings the benefit of having the data available when there
> > is recompute of the lflow node.
> >
> > As design choice there is only single lflow_ref for all IGMP
> > lflows, that makes them not being thread safe and only main thread
> > can generate them during full recompute of lflow node. This shouldn't
> > be an issue, because the computation of igmp lflow is pretty simple.
> >
> > Reported-at: https://issues.redhat.com/browse/FDP-756
> > Co-authored-by: Jacob Tanenbaum <jtanenba@redhat.com>
> > Signed-off-by: Jacob Tanenbaum <jtanenba@redhat.com>
> > Suggested-by: Dumitru Ceara <dceara@redhat.com>
> > Signed-off-by: Ales Musil <amusil@redhat.com>
> > ---
> >   northd/en-lflow.c        |  52 ++++++++-
> >   northd/en-lflow.h        |   1 +
> >   northd/en-multicast.c    | 223 ++++++++++++++++++++++++++++-----------
> >   northd/en-multicast.h    |  24 ++---
> >   northd/inc-proc-northd.c |  10 +-
> >   northd/northd.c          |  99 +++++++----------
> >   northd/northd.h          |  10 +-
> >   tests/ovn-northd.at      |  89 ++++++++++++++++
> >   8 files changed, 364 insertions(+), 144 deletions(-)
> >
> > diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> > index fa1f0236d..e2816f4da 100644
> > --- a/northd/en-lflow.c
> > +++ b/northd/en-lflow.c
> > @@ -23,6 +23,7 @@
> >   #include "en-lr-nat.h"
> >   #include "en-lr-stateful.h"
> >   #include "en-ls-stateful.h"
> > +#include "en-multicast.h"
> >   #include "en-northd.h"
> >   #include "en-meters.h"
> >   #include "en-sampling-app.h"
> > @@ -56,13 +57,11 @@ lflow_get_input_data(struct engine_node *node,
> >           engine_get_input_data("lr_stateful", node);
> >       struct ed_type_ls_stateful *ls_stateful_data =
> >           engine_get_input_data("ls_stateful", node);
> > +    struct multicast_igmp_data *multicat_igmp_data =
> > +        engine_get_input_data("multicast_igmp", node);
> >
> >       lflow_input->sbrec_logical_flow_table =
> >           EN_OVSDB_GET(engine_get_input("SB_logical_flow", node));
> > -    lflow_input->sbrec_multicast_group_table =
> > -        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
> > -    lflow_input->sbrec_igmp_group_table =
> > -        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
> >       lflow_input->sbrec_logical_dp_group_table =
> >           EN_OVSDB_GET(engine_get_input("SB_logical_dp_group", node));
> >
> > @@ -85,6 +84,8 @@ lflow_get_input_data(struct engine_node *node,
> >       lflow_input->parsed_routes = &routes_data->parsed_routes;
> >       lflow_input->route_tables = &routes_data->route_tables;
> >       lflow_input->route_policies = &route_policies_data->route_policies;
> > +    lflow_input->igmp_groups = &multicat_igmp_data->igmp_groups;
> > +    lflow_input->igmp_lflow_ref = multicat_igmp_data->lflow_ref;
> >
> >       struct ed_type_global_config *global_config =
> >           engine_get_input_data("global_config", node);
> > @@ -110,6 +111,7 @@ void en_lflow_run(struct engine_node *node, void
> *data)
> >       struct lflow_data *lflow_data = data;
> >       lflow_table_clear(lflow_data->lflow_table);
> >       lflow_reset_northd_refs(&lflow_input);
> > +    lflow_ref_clear(lflow_input.igmp_lflow_ref);
> >
> >       build_lflows(eng_ctx->ovnsb_idl_txn, &lflow_input,
> >                    lflow_data->lflow_table);
> > @@ -219,6 +221,48 @@ lflow_ls_stateful_handler(struct engine_node *node,
> void *data)
> >       return true;
> >   }
> >
> > +bool
> > +lflow_multicast_igmp_handler(struct engine_node *node, void *data)
> > +{
> > +    struct multicast_igmp_data *mcast_igmp_data =
> > +        engine_get_input_data("multicast_igmp", node);
> > +
> > +    const struct engine_context *eng_ctx = engine_get_context();
> > +    struct lflow_data *lflow_data = data;
> > +    struct lflow_input lflow_input;
> > +    lflow_get_input_data(node, &lflow_input);
> > +
> > +    if (!lflow_ref_resync_flows(mcast_igmp_data->lflow_ref,
> > +                                lflow_data->lflow_table,
> > +                                eng_ctx->ovnsb_idl_txn,
> > +                                lflow_input.ls_datapaths,
> > +                                lflow_input.lr_datapaths,
> > +
> lflow_input.ovn_internal_version_changed,
> > +                                lflow_input.sbrec_logical_flow_table,
> > +
> lflow_input.sbrec_logical_dp_group_table)) {
> > +        return false;
> > +    }
> > +
> > +    build_igmp_lflows(&mcast_igmp_data->igmp_groups,
> > +                      &lflow_input.ls_datapaths->datapaths,
> > +                      lflow_data->lflow_table,
> > +                      mcast_igmp_data->lflow_ref);
> > +
> > +    if (!lflow_ref_sync_lflows(mcast_igmp_data->lflow_ref,
> > +                               lflow_data->lflow_table,
> > +                               eng_ctx->ovnsb_idl_txn,
> > +                               lflow_input.ls_datapaths,
> > +                               lflow_input.lr_datapaths,
> > +                               lflow_input.ovn_internal_version_changed,
> > +                               lflow_input.sbrec_logical_flow_table,
> > +
>  lflow_input.sbrec_logical_dp_group_table)) {
> > +        return false;
> > +    }
> > +
> > +    engine_set_node_state(node, EN_UPDATED);
> > +    return true;
> > +}
> > +
> >   void *en_lflow_init(struct engine_node *node OVS_UNUSED,
> >                        struct engine_arg *arg OVS_UNUSED)
> >   {
> > diff --git a/northd/en-lflow.h b/northd/en-lflow.h
> > index 32cae6176..f90f5c61c 100644
> > --- a/northd/en-lflow.h
> > +++ b/northd/en-lflow.h
> > @@ -22,5 +22,6 @@ bool lflow_northd_handler(struct engine_node *, void
> *data);
> >   bool lflow_port_group_handler(struct engine_node *, void *data);
> >   bool lflow_lr_stateful_handler(struct engine_node *, void *data);
> >   bool lflow_ls_stateful_handler(struct engine_node *node, void *data);
> > +bool lflow_multicast_igmp_handler(struct engine_node *node, void *data);
> >
> >   #endif /* EN_LFLOW_H */
> > diff --git a/northd/en-multicast.c b/northd/en-multicast.c
> > index 0f07cf2fe..59a36f38b 100644
> > --- a/northd/en-multicast.c
> > +++ b/northd/en-multicast.c
> > @@ -22,6 +22,7 @@
> >
> >   /* OVN includes. */
> >   #include "en-multicast.h"
> > +#include "lflow-mgr.h"
> >   #include "lib/ip-mcast-index.h"
> >   #include "lib/mcast-group-index.h"
> >   #include "lib/ovn-l7.h"
> > @@ -47,6 +48,16 @@ static const struct multicast_group mc_unknown =
> >   static const struct multicast_group mc_flood_l2 =
> >       { MC_FLOOD_L2, OVN_MCAST_FLOOD_L2_TUNNEL_KEY };
> >
> > +static void build_mcast_groups(
> > +    struct multicast_igmp_data *, const struct sbrec_igmp_group_table *,
> > +    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
> > +    const struct hmap *ls_datapaths, const struct hmap *ls_ports,
> > +    const struct hmap *lr_ports);
> > +static void sync_multicast_groups_to_sb(
> > +    struct multicast_igmp_data *, struct ovsdb_idl_txn *,
> > +    const struct sbrec_multicast_group_table *,
> > +    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths);
> > +
> >   static bool multicast_group_equal(const struct multicast_group *,
> >                                     const struct multicast_group *);
> >   static uint32_t ovn_multicast_hash(const struct ovn_datapath *,
> > @@ -65,6 +76,7 @@ static void ovn_multicast_destroy(struct hmap
> *mcgroups,
> >                                     struct ovn_multicast *);
> >   static void ovn_multicast_update_sbrec(const struct ovn_multicast *,
> >                                          const struct
> sbrec_multicast_group *);
> > +static void ovn_multicast_groups_destroy(struct hmap *mcast_groups);
> >
> >   static uint32_t ovn_igmp_group_hash(const struct ovn_datapath *,
> >                                       const struct in6_addr *);
> > @@ -88,29 +100,123 @@ static void ovn_igmp_group_aggregate_ports(struct
> ovn_igmp_group *,
> >                                              struct hmap *mcast_groups);
> >   static void ovn_igmp_group_destroy(struct hmap *igmp_groups,
> >                                      struct ovn_igmp_group *);
> > +static void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
> > +
> > +void *
> > +en_multicast_igmp_init(struct engine_node *node OVS_UNUSED,
> > +                       struct engine_arg *arg OVS_UNUSED)
> > +{
> > +    struct multicast_igmp_data *data =xmalloc(sizeof *data);
> > +    hmap_init(&data->mcast_groups);
> > +    hmap_init(&data->igmp_groups);
> > +    data->lflow_ref = lflow_ref_create();
> > +
> > +    return data;
> > +}
> >
> >   void
> > -build_mcast_groups(const struct sbrec_igmp_group_table
> *sbrec_igmp_group_table,
> > +en_multicast_igmp_run(struct engine_node *node, void *data_)
> > +{
> > +    struct multicast_igmp_data *data = data_;
> > +    struct northd_data *northd_data = engine_get_input_data("northd",
> node);
> > +    const struct sbrec_igmp_group_table *sbrec_igmp_group_table =
> > +        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
> > +    const struct sbrec_multicast_group_table
> *sbrec_multicast_group_table =
> > +        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
> > +    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp =
> > +        engine_ovsdb_node_get_index(
> > +            engine_get_input("SB_multicast_group", node),
> > +            "sbrec_mcast_group_by_name");
> > +    const struct engine_context *eng_ctx = engine_get_context();
> > +
> > +    ovn_multicast_groups_destroy(&data->mcast_groups);
> > +    ovn_igmp_groups_destroy(&data->igmp_groups);
> > +
> > +    build_mcast_groups(data, sbrec_igmp_group_table,
> > +                      sbrec_mcast_group_by_name_dp,
> > +                      &northd_data->ls_datapaths.datapaths,
> > +                      &northd_data->ls_ports,
> > +                      &northd_data->lr_ports);
> > +    sync_multicast_groups_to_sb(data, eng_ctx->ovnsb_idl_txn,
> > +                                sbrec_multicast_group_table,
> > +                                &northd_data->ls_datapaths.datapaths,
> > +                                &northd_data->lr_datapaths.datapaths);
> > +
> > +    engine_set_node_state(node, EN_UPDATED);
> > +}
> > +
> > +bool
> > +multicast_igmp_northd_handler(struct engine_node *node, void *data
> OVS_UNUSED)
> > +{
> > +    struct northd_data *northd_data = engine_get_input_data("northd",
> node);
> > +    if (!northd_has_tracked_data(&northd_data->trk_data)) {
> > +        return false;
> > +    }
> > +
> > +    /* This node uses the below data from the en_northd engine node.
> > +     *      - northd_data->lr_datapaths
> > +     *      - northd_data->ls_ports
> > +     *      - northd_data->lr_ports
> > +     *
> > +     *      This data gets updated when a logical router is created or
> deleted.
> > +     *      northd engine node presently falls back to full recompute
> when
> > +     *      this happens and so does this node.
> > +     *      Note: When we add I-P to the created/deleted logical
> routers, we
> > +     *      need to revisit this handler.
> > +     *
> > +     *      This node also accesses the router ports of the logical
> router
> > +     *      (od->ports).  When these logical router ports gets updated,
> > +     *      en_northd engine recomputes and so does this node.
> > +     *      Note: When we add I-P to handle switch/router port changes,
> we
> > +     *      need to revisit this handler.
> > +     *
> > +     * */
> > +    return true;
> > +}
> > +
> > +void
> > +en_multicast_igmp_cleanup(void *data_)
> > +{
> > +    struct multicast_igmp_data *data = data_;
> > +
> > +    ovn_multicast_groups_destroy(&data->mcast_groups);
> > +    ovn_igmp_groups_destroy(&data->igmp_groups);
> > +    hmap_destroy(&data->mcast_groups);
> > +    hmap_destroy(&data->igmp_groups);
> > +    lflow_ref_destroy(data->lflow_ref);
> > +}
> > +
> > +struct sbrec_multicast_group *
> > +create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
> > +                          const struct sbrec_datapath_binding *dp,
> > +                          const char *name,
> > +                          int64_t tunnel_key)
> > +{
> > +    struct sbrec_multicast_group *sbmc =
> > +        sbrec_multicast_group_insert(ovnsb_txn);
> > +    sbrec_multicast_group_set_datapath(sbmc, dp);
> > +    sbrec_multicast_group_set_name(sbmc, name);
> > +    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
> > +    return sbmc;
> > +}
> > +
> > +static void
> > +build_mcast_groups(struct multicast_igmp_data *data,
> > +                   const struct sbrec_igmp_group_table
> *sbrec_igmp_group_table,
> >                      struct ovsdb_idl_index
> *sbrec_mcast_group_by_name_dp,
> >                      const struct hmap *ls_datapaths,
> >                      const struct hmap *ls_ports,
> > -                   const struct hmap *lr_ports,
> > -                   struct hmap *mcast_groups,
> > -                   struct hmap *igmp_groups)
> > -{
> > +                   const struct hmap *lr_ports) {
> >       struct ovn_datapath *od;
> >       struct ovn_port *op;
> >
> > -    hmap_init(mcast_groups);
> > -    hmap_init(igmp_groups);
> > -
> >       HMAP_FOR_EACH (op, key_node, lr_ports) {
> >           if (lrport_is_enabled(op->nbrp)) {
> >               /* If this port is configured to always flood multicast
> traffic
> >                * add it to the MC_STATIC group.
> >                */
> >               if (op->mcast_info.flood) {
> > -                ovn_multicast_add(mcast_groups, &mc_static, op);
> > +                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
> >                   op->od->mcast_info.rtr.flood_static = true;
> >               }
> >           }
> > @@ -118,14 +224,14 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >
> >       HMAP_FOR_EACH (op, key_node, ls_ports) {
> >           if (lsp_is_enabled(op->nbsp)) {
> > -            ovn_multicast_add(mcast_groups, &mc_flood, op);
> > +            ovn_multicast_add(&data->mcast_groups, &mc_flood, op);
> >
> >               if (!lsp_is_router(op->nbsp)) {
> > -                ovn_multicast_add(mcast_groups, &mc_flood_l2, op);
> > +                ovn_multicast_add(&data->mcast_groups, &mc_flood_l2,
> op);
> >               }
> >
> >               if (op->has_unknown) {
> > -                ovn_multicast_add(mcast_groups, &mc_unknown, op);
> > +                ovn_multicast_add(&data->mcast_groups, &mc_unknown, op);
> >               }
> >
> >               /* If this port is connected to a multicast router then
> add it
> > @@ -133,7 +239,7 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >                */
> >               if (op->od->mcast_info.sw.flood_relay && op->peer &&
> >                   op->peer->od && op->peer->od->mcast_info.rtr.relay) {
> > -                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
> > +                ovn_multicast_add(&data->mcast_groups,
> &mc_mrouter_flood, op);
> >               }
> >
> >               /* If this port is configured to always flood multicast
> reports
> > @@ -141,7 +247,7 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >                * flooded to statically configured or learned mrouters).
> >                */
> >               if (op->mcast_info.flood_reports) {
> > -                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
> > +                ovn_multicast_add(&data->mcast_groups,
> &mc_mrouter_flood, op);
> >                   op->od->mcast_info.sw.flood_reports = true;
> >               }
> >
> > @@ -149,7 +255,7 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >                * add it to the MC_STATIC group.
> >                */
> >               if (op->mcast_info.flood) {
> > -                ovn_multicast_add(mcast_groups, &mc_static, op);
> > +                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
> >                   op->od->mcast_info.sw.flood_static = true;
> >               }
> >           }
> > @@ -202,7 +308,8 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >            * if the multicast group already exists.
> >            */
> >           struct ovn_igmp_group *igmp_group =
> > -            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
> igmp_groups, od,
> > +            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
> > +                               &data->igmp_groups, od,
> >                                  &group_address, sb_igmp->address);
> >
> >           /* Add the extracted ports to the IGMP group. */
> > @@ -240,7 +347,7 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >
> >               struct ovn_igmp_group *igmp_group_rtr =
> >                   ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
> > -                                   igmp_groups, router_port->od,
> > +                                   &data->igmp_groups, router_port->od,
> >                                      &group_address, igmp_group->
> mcgroup.name);
> >               struct ovn_port **router_igmp_ports =
> >                   xmalloc(sizeof *router_igmp_ports);
> > @@ -260,41 +367,41 @@ build_mcast_groups(const struct
> sbrec_igmp_group_table *sbrec_igmp_group_table,
> >        * explicitly.
> >        */
> >       struct ovn_igmp_group *igmp_group;
> > -    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> > +    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, &data->igmp_groups) {
> >
> >           /* If this is a mrouter entry just aggregate the mrouter ports
> >            * into the MC_MROUTER mcast_group and destroy the igmp_group;
> >            * no more processing needed. */
> >           if (!strcmp(igmp_group->mcgroup.name,
> OVN_IGMP_GROUP_MROUTERS)) {
> > -            ovn_igmp_mrouter_aggregate_ports(igmp_group, mcast_groups);
> > -            ovn_igmp_group_destroy(igmp_groups, igmp_group);
> > +            ovn_igmp_mrouter_aggregate_ports(igmp_group,
> &data->mcast_groups);
> > +            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
> >               continue;
> >           }
> >
> >           if (!ovn_igmp_group_allocate_id(igmp_group)) {
> >               /* If we ran out of keys just destroy the entry. */
> > -            ovn_igmp_group_destroy(igmp_groups, igmp_group);
> > +            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
> >               continue;
> >           }
> >
> >           /* Aggregate the ports from all entries corresponding to this
> >            * group.
> >            */
> > -        ovn_igmp_group_aggregate_ports(igmp_group, mcast_groups);
> > +        ovn_igmp_group_aggregate_ports(igmp_group, &data->mcast_groups);
> >       }
> >   }
> >
> > -void
> > +static void
> >   sync_multicast_groups_to_sb(
> > -    struct ovsdb_idl_txn *ovnsb_txn,
> > +    struct multicast_igmp_data *data, struct ovsdb_idl_txn *ovnsb_txn,
> >       const struct sbrec_multicast_group_table
> *sbrec_multicast_group_table,
> > -    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
> > -    struct hmap *mcast_groups)
> > +    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths)
> >   {
> > +    struct hmapx mcast_in_sb = HMAPX_INITIALIZER(&mcast_in_sb);
> > +
> >       /* Push changes to the Multicast_Group table to database. */
> >       const struct sbrec_multicast_group *sbmc;
> > -    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH_SAFE (
> > -        sbmc, sbrec_multicast_group_table) {
> > +    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH (sbmc,
> sbrec_multicast_group_table) {
> >           struct ovn_datapath *od = ovn_datapath_from_sbrec(ls_datapaths,
> >                                                             lr_datapaths,
> >
>  sbmc->datapath);
>
> I think this needs to continue using the "_SAFE" variant because there
> is a condition within the loop where sbrec_multicast_group_delete() is
> called.
>

You are right, I'm not sure why it was removed in the first place, but I'll
add it back.


> > @@ -306,55 +413,34 @@ sync_multicast_groups_to_sb(
> >
> >           struct multicast_group group = { .name = sbmc->name,
> >               .key = sbmc->tunnel_key };
> > -        struct ovn_multicast *mc = ovn_multicast_find(mcast_groups,
> > +        struct ovn_multicast *mc =
> ovn_multicast_find(&data->mcast_groups,
> >                                                         od, &group);
> >           if (mc) {
> >               ovn_multicast_update_sbrec(mc, sbmc);
> > -            ovn_multicast_destroy(mcast_groups, mc);
> > +            hmapx_add(&mcast_in_sb, mc);
> >           } else {
> >               sbrec_multicast_group_delete(sbmc);
>
> Right here ^^
>
> >           }
> >       }
> >       struct ovn_multicast *mc;
> > -    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
> > +    HMAP_FOR_EACH_SAFE (mc, hmap_node, &data->mcast_groups) {
> >           if (!mc->datapath) {
> > -            ovn_multicast_destroy(mcast_groups, mc);
> > +            ovn_multicast_destroy(&data->mcast_groups, mc);
> > +            continue;
> > +        }
> > +
> > +        if (hmapx_contains(&mcast_in_sb, mc)) {
> >               continue;
> >           }
> > +
> >           sbmc = create_sb_multicast_group(ovnsb_txn, mc->datapath->sb,
> >                                            mc->group->name,
> mc->group->key);
> >           ovn_multicast_update_sbrec(mc, sbmc);
> > -        ovn_multicast_destroy(mcast_groups, mc);
> >       }
> >
> > -    hmap_destroy(mcast_groups);
> > -}
> > -
> > -void
> > -ovn_igmp_groups_destroy(struct hmap *igmp_groups)
> > -{
> > -    struct ovn_igmp_group *igmp_group;
> > -    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> > -        ovn_igmp_group_destroy(igmp_groups, igmp_group);
> > -    }
> > -    hmap_destroy(igmp_groups);
> > +    hmapx_destroy(&mcast_in_sb);
> >   }
> >
> > -struct sbrec_multicast_group *
> > -create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
> > -                          const struct sbrec_datapath_binding *dp,
> > -                          const char *name,
> > -                          int64_t tunnel_key)
> > -{
> > -    struct sbrec_multicast_group *sbmc =
> > -        sbrec_multicast_group_insert(ovnsb_txn);
> > -    sbrec_multicast_group_set_datapath(sbmc, dp);
> > -    sbrec_multicast_group_set_name(sbmc, name);
> > -    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
> > -    return sbmc;
> > -}
> > -
> > -
> >   static bool
> >   multicast_group_equal(const struct multicast_group *a,
> >                         const struct multicast_group *b)
> > @@ -362,7 +448,6 @@ multicast_group_equal(const struct multicast_group
> *a,
> >       return !strcmp(a->name, b->name) && a->key == b->key;
> >   }
> >
> > -
> >   static uint32_t
> >   ovn_multicast_hash(const struct ovn_datapath *datapath,
> >                      const struct multicast_group *group)
> > @@ -452,6 +537,15 @@ ovn_multicast_update_sbrec(const struct
> ovn_multicast *mc,
> >       free(ports);
> >   }
> >
> > +static void
> > +ovn_multicast_groups_destroy(struct hmap *mcast_groups)
> > +{
> > +    struct ovn_multicast *mc;
> > +    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
> > +        ovn_multicast_destroy(mcast_groups, mc);
> > +    }
> > +}
> > +
> >   static uint32_t
> >   ovn_igmp_group_hash(const struct ovn_datapath *datapath,
> >                       const struct in6_addr *address)
> > @@ -644,3 +738,12 @@ ovn_igmp_group_destroy(struct hmap *igmp_groups,
> >           free(igmp_group);
> >       }
> >   }
> > +
> > +static void
> > +ovn_igmp_groups_destroy(struct hmap *igmp_groups)
> > +{
> > +    struct ovn_igmp_group *igmp_group;
> > +    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
> > +        ovn_igmp_group_destroy(igmp_groups, igmp_group);
> > +    }
> > +}
> > diff --git a/northd/en-multicast.h b/northd/en-multicast.h
> > index 9a6848f78..3932b4b08 100644
> > --- a/northd/en-multicast.h
> > +++ b/northd/en-multicast.h
> > @@ -70,20 +70,16 @@ struct ovn_igmp_group {
> >       struct ovs_list entries; /* List of SB entries for this group. */
> >   };
> >
> > -void build_mcast_groups(
> > -    const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
> > -    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
> > -    const struct hmap *ls_datapaths,
> > -    const struct hmap *ls_ports,
> > -    const struct hmap *lr_ports,
> > -    struct hmap *mcast_groups,
> > -    struct hmap *igmp_groups);
> > -void sync_multicast_groups_to_sb(
> > -    struct ovsdb_idl_txn *ovnsb_txn,
> > -    const struct sbrec_multicast_group_table
> *sbrec_multicast_group_table,
> > -    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
> > -    struct hmap *mcast_groups);
> > -void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
> > +struct multicast_igmp_data {
> > +    struct hmap mcast_groups;
> > +    struct hmap igmp_groups;
> > +    struct lflow_ref *lflow_ref;
> > +};
> > +
> > +void *en_multicast_igmp_init(struct engine_node *,struct engine_arg *);
> > +void en_multicast_igmp_run(struct engine_node *, void *);
> > +bool multicast_igmp_northd_handler(struct engine_node *, void *);
> > +void en_multicast_igmp_cleanup(void *);
> >   struct sbrec_multicast_group *create_sb_multicast_group(
> >       struct ovsdb_idl_txn *ovnsb_txn, const struct
> sbrec_datapath_binding *,
> >       const char *name, int64_t tunnel_key);
> > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> > index 6e0aa04c4..a9990974c 100644
> > --- a/northd/inc-proc-northd.c
> > +++ b/northd/inc-proc-northd.c
> > @@ -34,6 +34,7 @@
> >   #include "en-lr-stateful.h"
> >   #include "en-lr-nat.h"
> >   #include "en-ls-stateful.h"
> > +#include "en-multicast.h"
> >   #include "en-northd.h"
> >   #include "en-lflow.h"
> >   #include "en-northd-output.h"
> > @@ -161,6 +162,7 @@ static ENGINE_NODE(route_policies, "route_policies");
> >   static ENGINE_NODE(routes, "routes");
> >   static ENGINE_NODE(bfd, "bfd");
> >   static ENGINE_NODE(bfd_sync, "bfd_sync");
> > +static ENGINE_NODE(multicast_igmp, "multicast_igmp");
> >
> >   void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >                             struct ovsdb_idl_loop *sb)
> > @@ -267,11 +269,15 @@ void inc_proc_northd_init(struct ovsdb_idl_loop
> *nb,
> >       engine_add_input(&en_sync_meters, &en_nb_meter, NULL);
> >       engine_add_input(&en_sync_meters, &en_sb_meter, NULL);
> >
> > +    engine_add_input(&en_multicast_igmp, &en_northd,
> > +                     multicast_igmp_northd_handler);
> > +    engine_add_input(&en_multicast_igmp, &en_sb_multicast_group, NULL);
> > +    engine_add_input(&en_multicast_igmp, &en_sb_igmp_group, NULL);
> > +
> >       engine_add_input(&en_lflow, &en_nb_acl, NULL);
> >       engine_add_input(&en_lflow, &en_sync_meters, NULL);
> >       engine_add_input(&en_lflow, &en_sb_logical_flow, NULL);
> >       engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
> > -    engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
> >       engine_add_input(&en_lflow, &en_sb_logical_dp_group, NULL);
> >       engine_add_input(&en_lflow, &en_bfd_sync, NULL);
> >       engine_add_input(&en_lflow, &en_route_policies, NULL);
> > @@ -285,6 +291,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >       engine_add_input(&en_lflow, &en_port_group,
> lflow_port_group_handler);
> >       engine_add_input(&en_lflow, &en_lr_stateful,
> lflow_lr_stateful_handler);
> >       engine_add_input(&en_lflow, &en_ls_stateful,
> lflow_ls_stateful_handler);
> > +    engine_add_input(&en_lflow, &en_multicast_igmp,
> > +                     lflow_multicast_igmp_handler);
> >
> >       engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
> >       engine_add_input(&en_sync_to_sb_addr_set, &en_lr_stateful, NULL);
> > diff --git a/northd/northd.c b/northd/northd.c
> > index 905f19ff1..4172f3396 100644
> > --- a/northd/northd.c
> > +++ b/northd/northd.c
> > @@ -16980,7 +16980,6 @@ struct lswitch_flow_build_info {
> >       const struct lr_stateful_table *lr_stateful_table;
> >       const struct ls_stateful_table *ls_stateful_table;
> >       struct lflow_table *lflows;
> > -    struct hmap *igmp_groups;
> >       const struct shash *meter_groups;
> >       const struct hmap *lb_dps_map;
> >       const struct hmap *svc_monitor_map;
> > @@ -17145,7 +17144,6 @@ build_lflows_thread(void *arg)
> >       const struct lr_stateful_record *lr_stateful_rec;
> >       const struct ls_stateful_record *ls_stateful_rec;
> >       struct lswitch_flow_build_info *lsi;
> > -    struct ovn_igmp_group *igmp_group;
> >       struct ovn_lb_datapaths *lb_dps;
> >       struct ovn_datapath *od;
> >       struct ovn_port *op;
> > @@ -17296,27 +17294,6 @@ build_lflows_thread(void *arg)
> >                   }
> >               }
> >
> > -            for (bnum = control->id;
> > -                    bnum <= lsi->igmp_groups->mask;
> > -                    bnum += control->pool->size)
> > -            {
> > -                HMAP_FOR_EACH_IN_PARALLEL (
> > -                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> > -                    if (stop_parallel_processing()) {
> > -                        return NULL;
> > -                    }
> > -                    if (igmp_group->datapath->nbs) {
> > -                        build_lswitch_ip_mcast_igmp_mld(igmp_group,
> > -                                                        lsi->lflows,
> > -                                                        &lsi->actions,
> > -                                                        &lsi->match,
> NULL);
> > -                    } else {
> > -                        build_igmp_flows_for_lrouter(igmp_group,
> lsi->lflows,
> > -                                                     &lsi->actions,
> > -                                                     &lsi->match, NULL);
> > -                    }
> > -                }
> > -            }
> >               lsi->thread_lflow_counter = thread_lflow_counter;
> >           }
> >           post_completed_work(control);
> > @@ -17366,7 +17343,6 @@ build_lswitch_and_lrouter_flows(
> >       const struct lr_stateful_table *lr_stateful_table,
> >       const struct ls_stateful_table *ls_stateful_table,
> >       struct lflow_table *lflows,
> > -    struct hmap *igmp_groups,
> >       const struct shash *meter_groups,
> >       const struct hmap *lb_dps_map,
> >       const struct hmap *svc_monitor_map,
> > @@ -17401,7 +17377,6 @@ build_lswitch_and_lrouter_flows(
> >               lsiv[index].ls_port_groups = ls_pgs;
> >               lsiv[index].lr_stateful_table = lr_stateful_table;
> >               lsiv[index].ls_stateful_table = ls_stateful_table;
> > -            lsiv[index].igmp_groups = igmp_groups;
> >               lsiv[index].meter_groups = meter_groups;
> >               lsiv[index].lb_dps_map = lb_dps_map;
> >               lsiv[index].svc_monitor_map = svc_monitor_map;
> > @@ -17432,7 +17407,6 @@ build_lswitch_and_lrouter_flows(
> >       } else {
> >           const struct lr_stateful_record *lr_stateful_rec;
> >           const struct ls_stateful_record *ls_stateful_rec;
> > -        struct ovn_igmp_group *igmp_group;
> >           struct ovn_lb_datapaths *lb_dps;
> >           struct ovn_datapath *od;
> >           struct ovn_port *op;
> > @@ -17446,7 +17420,6 @@ build_lswitch_and_lrouter_flows(
> >               .lr_stateful_table = lr_stateful_table,
> >               .ls_stateful_table = ls_stateful_table,
> >               .lflows = lflows,
> > -            .igmp_groups = igmp_groups,
> >               .meter_groups = meter_groups,
> >               .lb_dps_map = lb_dps_map,
> >               .svc_monitor_map = svc_monitor_map,
> > @@ -17535,19 +17508,6 @@ build_lswitch_and_lrouter_flows(
> >                                       lsi.features,
> >                                       lsi.lflows);
> >           }
> > -        stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
> > -        stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> > -        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> > -            if (igmp_group->datapath->nbs) {
> > -                build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi.lflows,
> > -                                                &lsi.actions,
> &lsi.match,
> > -                                                NULL);
> > -            } else {
> > -                build_igmp_flows_for_lrouter(igmp_group, lsi.lflows,
> > -                                             &lsi.actions, &lsi.match,
> NULL);
> > -            }
> > -        }
> > -        stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> >
> >           ds_destroy(&lsi.match);
> >           ds_destroy(&lsi.actions);
> > @@ -17556,6 +17516,40 @@ build_lswitch_and_lrouter_flows(
> >       free(svc_check_match);
> >   }
> >
> > +/* The IGMP flows have to be built in main thread because there is
> > + * single lflow_ref for all of them which isn't thread safe.
> > + * This shouldn't affect performance as there is a limited how many
> > + * IGMP groups can be created. */
> > +void
> > +build_igmp_lflows(struct hmap *igmp_groups, const struct hmap
> *ls_datapaths,
> > +                  struct lflow_table *lflows, struct lflow_ref
> *lflow_ref)
> > +{
> > +    struct ds actions = DS_EMPTY_INITIALIZER;
> > +    struct ds match = DS_EMPTY_INITIALIZER;
> > +
> > +    struct ovn_datapath *od;
> > +    HMAP_FOR_EACH (od, key_node, ls_datapaths) {
> > +        init_mcast_flow_count(od);
> > +        build_mcast_flood_lswitch(od, lflows, &actions, lflow_ref);
> > +    }
> > +
> > +    stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> > +    struct ovn_igmp_group *igmp_group;
> > +    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> > +        if (igmp_group->datapath->nbs) {
> > +            build_lswitch_ip_mcast_igmp_mld(igmp_group, lflows,
> &actions,
> > +                                            &match, lflow_ref);
> > +        } else {
> > +            build_igmp_flows_for_lrouter(igmp_group, lflows, &actions,
> > +                                         &match, lflow_ref);
> > +        }
> > +    }
> > +    stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
> > +
> > +    ds_destroy(&actions);
> > +    ds_destroy(&match);
> > +}
> > +
> >   void run_update_worker_pool(int n_threads)
> >   {
> >       /* If number of threads has been updated (or initially set),
> > @@ -17580,20 +17574,6 @@ void build_lflows(struct ovsdb_idl_txn
> *ovnsb_txn,
> >                     struct lflow_input *input_data,
> >                     struct lflow_table *lflows)
> >   {
> > -    struct hmap mcast_groups;
> > -    struct hmap igmp_groups;
> > -
> > -    struct ovn_datapath *od;
> > -    HMAP_FOR_EACH (od, key_node, &input_data->ls_datapaths->datapaths) {
> > -        init_mcast_flow_count(od);
> > -    }
> > -
> > -    build_mcast_groups(input_data->sbrec_igmp_group_table,
> > -                       input_data->sbrec_mcast_group_by_name_dp,
> > -                       &input_data->ls_datapaths->datapaths,
> > -                       input_data->ls_ports, input_data->lr_ports,
> > -                       &mcast_groups, &igmp_groups);
> > -
> >       build_lswitch_and_lrouter_flows(input_data->ls_datapaths,
> >                                       input_data->lr_datapaths,
> >                                       input_data->ls_ports,
> > @@ -17602,7 +17582,6 @@ void build_lflows(struct ovsdb_idl_txn
> *ovnsb_txn,
> >                                       input_data->lr_stateful_table,
> >                                       input_data->ls_stateful_table,
> >                                       lflows,
> > -                                    &igmp_groups,
> >                                       input_data->meter_groups,
> >                                       input_data->lb_datapaths_map,
> >                                       input_data->svc_monitor_map,
> > @@ -17613,6 +17592,9 @@ void build_lflows(struct ovsdb_idl_txn
> *ovnsb_txn,
> >                                       input_data->parsed_routes,
> >                                       input_data->route_policies,
> >                                       input_data->route_tables);
> > +    build_igmp_lflows(input_data->igmp_groups,
> > +                      &input_data->ls_datapaths->datapaths,
> > +                      lflows, input_data->igmp_lflow_ref);
> >
> >       if (parallelization_state == STATE_INIT_HASH_SIZES) {
> >           parallelization_state = STATE_USE_PARALLELIZATION;
> > @@ -17630,13 +17612,6 @@ void build_lflows(struct ovsdb_idl_txn
> *ovnsb_txn,
> >                              input_data->sbrec_logical_dp_group_table);
> >
> >       stopwatch_stop(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
> > -
> > -    sync_multicast_groups_to_sb(ovnsb_txn,
> > -                                input_data->sbrec_multicast_group_table,
> > -                                &input_data->ls_datapaths->datapaths,
> > -                                &input_data->lr_datapaths->datapaths,
> > -                                &mcast_groups);
> > -    ovn_igmp_groups_destroy(&igmp_groups);
> >   }
> >
> >   void
> > diff --git a/northd/northd.h b/northd/northd.h
> > index 218f0f62d..6bca6bb1a 100644
> > --- a/northd/northd.h
> > +++ b/northd/northd.h
> > @@ -198,13 +198,12 @@ struct bfd_sync_data {
> >       struct sset bfd_ports;
> >   };
> >
> > +struct lflow_ref;
> >   struct lr_nat_table;
> >
> >   struct lflow_input {
> >       /* Southbound table references */
> >       const struct sbrec_logical_flow_table *sbrec_logical_flow_table;
> > -    const struct sbrec_multicast_group_table
> *sbrec_multicast_group_table;
> > -    const struct sbrec_igmp_group_table *sbrec_igmp_group_table;
> >       const struct sbrec_logical_dp_group_table
> *sbrec_logical_dp_group_table;
> >
> >       /* Indexes */
> > @@ -228,6 +227,8 @@ struct lflow_input {
> >       struct hmap *parsed_routes;
> >       struct hmap *route_policies;
> >       struct simap *route_tables;
> > +    struct hmap *igmp_groups;
> > +    struct lflow_ref *igmp_lflow_ref;
> >   };
> >
> >   extern int parallelization_state;
> > @@ -896,5 +897,8 @@ lsp_is_router(const struct nbrec_logical_switch_port
> *nbsp)
> >   }
> >
> >   struct ovn_port *ovn_port_find(const struct hmap *ports, const char
> *name);
> > -
> > +void build_igmp_lflows(struct hmap *igmp_groups,
> > +                       const struct hmap *ls_datapaths,
> > +                       struct lflow_table *lflows,
> > +                       struct lflow_ref *lflow_ref);
> >   #endif /* NORTHD_H */
> > diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> > index 507cc302f..91ba5b736 100644
> > --- a/tests/ovn-northd.at
> > +++ b/tests/ovn-northd.at
> > @@ -14384,3 +14384,92 @@ AT_CHECK([ovn-sbctl lflow-list S1 | grep
> ls_out_acl_action | grep priority=500 |
> >
> >   AT_CLEANUP
> >   ])
> > +
> > +OVN_FOR_EACH_NORTHD_NO_HV([
> > +AT_SETUP([IGMP incremental processing])
> > +
> > +check_recompute_counter() {
> > +    lflow_recomp=$(as northd ovn-appctl -t ovn-northd
> inc-engine/show-stats lflow recompute)
> > +    AT_CHECK([test x$lflow_recomp = x$1])
> > +}
> > +ovn_start
> > +
> > +net_add n1
> > +sim_add hv1
> > +as hv1
> > +
> > +ovs-vsctl add-br br-phys
> > +ovn_attach n1 br-phys 192.168.0.11
> > +
> > +sim_add hv2
> > +as hv2
> > +
> > +check ovs-vsctl add-br br-phys
> > +ovn_attach n1 br-phys 192.168.0.2
> > +
> > +check ovn-nbctl ls-add sw1
> > +check ovn-nbctl ls-add sw2
> > +
> > +check ovn-nbctl lsp-add sw1 sw1-p11
> > +check ovn-nbctl lsp-add sw2 sw2-p21
> > +
> > +check ovn-nbctl lr-add rtr
> > +check ovn-nbctl lrp-add rtr rtr-sw1 00:00:00:00:01:00 10.0.0.254/24
> > +check ovn-nbctl lrp-add rtr rtr-sw2 00:00:00:00:02:00 10.0.0.254/24
> > +
> > +
> > +check ovn-nbctl lsp-add sw1 sw1-rtr \
> > +    -- lsp-set-type sw1-rtr router  \
> > +    -- lsp-set-addresses sw1-rtr 00:00:00:00:01:00 \
> > +    -- lsp-set-options sw1-rtr router-port=rtr-sw1
> > +
> > +check ovn-nbctl lsp-add sw2 sw2-rtr \
> > +    -- lsp-set-type sw2-rtr router  \
> > +    -- lsp-set-addresses sw1-rtr 00:00:00:00:02:00 \
> > +    -- lsp-set-options sw2-rtr router-port=rtr-sw2
> > +
> > +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> > +
> > +# Create IGMP_Group 239.0.1.68 with port sw1-p11
> > +ovn-sbctl create IGMP_Group address=239.0.1.68 \
> > +    datapath=$(fetch_column Datapath_Binding _uuid
> external_ids:name=sw1) \
> > +    chassis=$(fetch_column Chassis _uuid name=hv1) \
> > +    chassis_name=hv1 \
> > +    ports=$(fetch_column Port_Binding _uuid logical_port=sw1-p11)
> > +igmp_uuid=$(fetch_column IGMP_GROUP _uuid address=239.0.1.68)
> > +
> > +check ovn-nbctl --wait=sb sync
> > +wait_row_count Igmp_Group 1 address=239.0.1.68
> > +wait_row_count Multicast_Group 1 name="239.0.1.68"
> > +wait_row_count Multicast_Group  1 name="239.0.1.68"
> ports='[['$(fetch_column Port_Binding _uuid logical_port=sw1-p11)']]'
> > +ovn-sbctl list igmp_group
> > +check_recompute_counter 0
> > +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> > +
> > +check ovn-nbctl set logical_router rtr \
> > +    options:mcast_relay="true"
> > +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> > +# Update IGMP_Group 239.0.1.68 to include sw2-p21
> > +ovn-sbctl add IGMP_Group $igmp_uuid ports $(fetch_column Port_Binding
> _uuid logical_port=sw2-p21)
> > +
> > +check ovn-nbctl --wait=sb sync
> > +wait_row_count IGMP_Group 1 address=239.0.1.68
> > +
> > +# Check that new Multicast_Group is created
> > +wait_row_count Multicast_Group 2 name=239.0.1.68
> > +check_recompute_counter 0
> > +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> > +
> > +check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
> > +# Delete IGMP_Group 239.0.1.68
> > +ovn-sbctl destroy IGMP_Group $igmp_uuid
> > +check ovn-nbctl --wait=sb sync
> > +check_recompute_counter 0
> > +CHECK_NO_CHANGE_AFTER_RECOMPUTE
> > +
> > +wait_row_count IGMP_Group 0 address=239.0.1.68
> > +wait_row_count Multicast_Group 0 name=239.0.1.68
> > +
> > +OVN_CLEANUP([hv1], [hv2])
> > +AT_CLEANUP
> > +])
>
>
Thanks,
Ales
diff mbox series

Patch

diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index fa1f0236d..e2816f4da 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -23,6 +23,7 @@ 
 #include "en-lr-nat.h"
 #include "en-lr-stateful.h"
 #include "en-ls-stateful.h"
+#include "en-multicast.h"
 #include "en-northd.h"
 #include "en-meters.h"
 #include "en-sampling-app.h"
@@ -56,13 +57,11 @@  lflow_get_input_data(struct engine_node *node,
         engine_get_input_data("lr_stateful", node);
     struct ed_type_ls_stateful *ls_stateful_data =
         engine_get_input_data("ls_stateful", node);
+    struct multicast_igmp_data *multicat_igmp_data =
+        engine_get_input_data("multicast_igmp", node);
 
     lflow_input->sbrec_logical_flow_table =
         EN_OVSDB_GET(engine_get_input("SB_logical_flow", node));
-    lflow_input->sbrec_multicast_group_table =
-        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
-    lflow_input->sbrec_igmp_group_table =
-        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
     lflow_input->sbrec_logical_dp_group_table =
         EN_OVSDB_GET(engine_get_input("SB_logical_dp_group", node));
 
@@ -85,6 +84,8 @@  lflow_get_input_data(struct engine_node *node,
     lflow_input->parsed_routes = &routes_data->parsed_routes;
     lflow_input->route_tables = &routes_data->route_tables;
     lflow_input->route_policies = &route_policies_data->route_policies;
+    lflow_input->igmp_groups = &multicat_igmp_data->igmp_groups;
+    lflow_input->igmp_lflow_ref = multicat_igmp_data->lflow_ref;
 
     struct ed_type_global_config *global_config =
         engine_get_input_data("global_config", node);
@@ -110,6 +111,7 @@  void en_lflow_run(struct engine_node *node, void *data)
     struct lflow_data *lflow_data = data;
     lflow_table_clear(lflow_data->lflow_table);
     lflow_reset_northd_refs(&lflow_input);
+    lflow_ref_clear(lflow_input.igmp_lflow_ref);
 
     build_lflows(eng_ctx->ovnsb_idl_txn, &lflow_input,
                  lflow_data->lflow_table);
@@ -219,6 +221,48 @@  lflow_ls_stateful_handler(struct engine_node *node, void *data)
     return true;
 }
 
+bool
+lflow_multicast_igmp_handler(struct engine_node *node, void *data)
+{
+    struct multicast_igmp_data *mcast_igmp_data =
+        engine_get_input_data("multicast_igmp", node);
+
+    const struct engine_context *eng_ctx = engine_get_context();
+    struct lflow_data *lflow_data = data;
+    struct lflow_input lflow_input;
+    lflow_get_input_data(node, &lflow_input);
+
+    if (!lflow_ref_resync_flows(mcast_igmp_data->lflow_ref,
+                                lflow_data->lflow_table,
+                                eng_ctx->ovnsb_idl_txn,
+                                lflow_input.ls_datapaths,
+                                lflow_input.lr_datapaths,
+                                lflow_input.ovn_internal_version_changed,
+                                lflow_input.sbrec_logical_flow_table,
+                                lflow_input.sbrec_logical_dp_group_table)) {
+        return false;
+    }
+
+    build_igmp_lflows(&mcast_igmp_data->igmp_groups,
+                      &lflow_input.ls_datapaths->datapaths,
+                      lflow_data->lflow_table,
+                      mcast_igmp_data->lflow_ref);
+
+    if (!lflow_ref_sync_lflows(mcast_igmp_data->lflow_ref,
+                               lflow_data->lflow_table,
+                               eng_ctx->ovnsb_idl_txn,
+                               lflow_input.ls_datapaths,
+                               lflow_input.lr_datapaths,
+                               lflow_input.ovn_internal_version_changed,
+                               lflow_input.sbrec_logical_flow_table,
+                               lflow_input.sbrec_logical_dp_group_table)) {
+        return false;
+    }
+
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
 void *en_lflow_init(struct engine_node *node OVS_UNUSED,
                      struct engine_arg *arg OVS_UNUSED)
 {
diff --git a/northd/en-lflow.h b/northd/en-lflow.h
index 32cae6176..f90f5c61c 100644
--- a/northd/en-lflow.h
+++ b/northd/en-lflow.h
@@ -22,5 +22,6 @@  bool lflow_northd_handler(struct engine_node *, void *data);
 bool lflow_port_group_handler(struct engine_node *, void *data);
 bool lflow_lr_stateful_handler(struct engine_node *, void *data);
 bool lflow_ls_stateful_handler(struct engine_node *node, void *data);
+bool lflow_multicast_igmp_handler(struct engine_node *node, void *data);
 
 #endif /* EN_LFLOW_H */
diff --git a/northd/en-multicast.c b/northd/en-multicast.c
index 0f07cf2fe..59a36f38b 100644
--- a/northd/en-multicast.c
+++ b/northd/en-multicast.c
@@ -22,6 +22,7 @@ 
 
 /* OVN includes. */
 #include "en-multicast.h"
+#include "lflow-mgr.h"
 #include "lib/ip-mcast-index.h"
 #include "lib/mcast-group-index.h"
 #include "lib/ovn-l7.h"
@@ -47,6 +48,16 @@  static const struct multicast_group mc_unknown =
 static const struct multicast_group mc_flood_l2 =
     { MC_FLOOD_L2, OVN_MCAST_FLOOD_L2_TUNNEL_KEY };
 
+static void build_mcast_groups(
+    struct multicast_igmp_data *, const struct sbrec_igmp_group_table *,
+    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
+    const struct hmap *ls_datapaths, const struct hmap *ls_ports,
+    const struct hmap *lr_ports);
+static void sync_multicast_groups_to_sb(
+    struct multicast_igmp_data *, struct ovsdb_idl_txn *,
+    const struct sbrec_multicast_group_table *,
+    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths);
+
 static bool multicast_group_equal(const struct multicast_group *,
                                   const struct multicast_group *);
 static uint32_t ovn_multicast_hash(const struct ovn_datapath *,
@@ -65,6 +76,7 @@  static void ovn_multicast_destroy(struct hmap *mcgroups,
                                   struct ovn_multicast *);
 static void ovn_multicast_update_sbrec(const struct ovn_multicast *,
                                        const struct sbrec_multicast_group *);
+static void ovn_multicast_groups_destroy(struct hmap *mcast_groups);
 
 static uint32_t ovn_igmp_group_hash(const struct ovn_datapath *,
                                     const struct in6_addr *);
@@ -88,29 +100,123 @@  static void ovn_igmp_group_aggregate_ports(struct ovn_igmp_group *,
                                            struct hmap *mcast_groups);
 static void ovn_igmp_group_destroy(struct hmap *igmp_groups,
                                    struct ovn_igmp_group *);
+static void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
+
+void *
+en_multicast_igmp_init(struct engine_node *node OVS_UNUSED,
+                       struct engine_arg *arg OVS_UNUSED)
+{
+    struct multicast_igmp_data *data =xmalloc(sizeof *data);
+    hmap_init(&data->mcast_groups);
+    hmap_init(&data->igmp_groups);
+    data->lflow_ref = lflow_ref_create();
+
+    return data;
+}
 
 void
-build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
+en_multicast_igmp_run(struct engine_node *node, void *data_)
+{
+    struct multicast_igmp_data *data = data_;
+    struct northd_data *northd_data = engine_get_input_data("northd", node);
+    const struct sbrec_igmp_group_table *sbrec_igmp_group_table =
+        EN_OVSDB_GET(engine_get_input("SB_igmp_group", node));
+    const struct sbrec_multicast_group_table *sbrec_multicast_group_table =
+        EN_OVSDB_GET(engine_get_input("SB_multicast_group", node));
+    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp =
+        engine_ovsdb_node_get_index(
+            engine_get_input("SB_multicast_group", node),
+            "sbrec_mcast_group_by_name");
+    const struct engine_context *eng_ctx = engine_get_context();
+
+    ovn_multicast_groups_destroy(&data->mcast_groups);
+    ovn_igmp_groups_destroy(&data->igmp_groups);
+
+    build_mcast_groups(data, sbrec_igmp_group_table,
+                      sbrec_mcast_group_by_name_dp,
+                      &northd_data->ls_datapaths.datapaths,
+                      &northd_data->ls_ports,
+                      &northd_data->lr_ports);
+    sync_multicast_groups_to_sb(data, eng_ctx->ovnsb_idl_txn,
+                                sbrec_multicast_group_table,
+                                &northd_data->ls_datapaths.datapaths,
+                                &northd_data->lr_datapaths.datapaths);
+
+    engine_set_node_state(node, EN_UPDATED);
+}
+
+bool
+multicast_igmp_northd_handler(struct engine_node *node, void *data OVS_UNUSED)
+{
+    struct northd_data *northd_data = engine_get_input_data("northd", node);
+    if (!northd_has_tracked_data(&northd_data->trk_data)) {
+        return false;
+    }
+
+    /* This node uses the below data from the en_northd engine node.
+     *      - northd_data->lr_datapaths
+     *      - northd_data->ls_ports
+     *      - northd_data->lr_ports
+     *
+     *      This data gets updated when a logical router is created or deleted.
+     *      northd engine node presently falls back to full recompute when
+     *      this happens and so does this node.
+     *      Note: When we add I-P to the created/deleted logical routers, we
+     *      need to revisit this handler.
+     *
+     *      This node also accesses the router ports of the logical router
+     *      (od->ports).  When these logical router ports gets updated,
+     *      en_northd engine recomputes and so does this node.
+     *      Note: When we add I-P to handle switch/router port changes, we
+     *      need to revisit this handler.
+     *
+     * */
+    return true;
+}
+
+void
+en_multicast_igmp_cleanup(void *data_)
+{
+    struct multicast_igmp_data *data = data_;
+
+    ovn_multicast_groups_destroy(&data->mcast_groups);
+    ovn_igmp_groups_destroy(&data->igmp_groups);
+    hmap_destroy(&data->mcast_groups);
+    hmap_destroy(&data->igmp_groups);
+    lflow_ref_destroy(data->lflow_ref);
+}
+
+struct sbrec_multicast_group *
+create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
+                          const struct sbrec_datapath_binding *dp,
+                          const char *name,
+                          int64_t tunnel_key)
+{
+    struct sbrec_multicast_group *sbmc =
+        sbrec_multicast_group_insert(ovnsb_txn);
+    sbrec_multicast_group_set_datapath(sbmc, dp);
+    sbrec_multicast_group_set_name(sbmc, name);
+    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
+    return sbmc;
+}
+
+static void
+build_mcast_groups(struct multicast_igmp_data *data,
+                   const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
                    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
                    const struct hmap *ls_datapaths,
                    const struct hmap *ls_ports,
-                   const struct hmap *lr_ports,
-                   struct hmap *mcast_groups,
-                   struct hmap *igmp_groups)
-{
+                   const struct hmap *lr_ports) {
     struct ovn_datapath *od;
     struct ovn_port *op;
 
-    hmap_init(mcast_groups);
-    hmap_init(igmp_groups);
-
     HMAP_FOR_EACH (op, key_node, lr_ports) {
         if (lrport_is_enabled(op->nbrp)) {
             /* If this port is configured to always flood multicast traffic
              * add it to the MC_STATIC group.
              */
             if (op->mcast_info.flood) {
-                ovn_multicast_add(mcast_groups, &mc_static, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
                 op->od->mcast_info.rtr.flood_static = true;
             }
         }
@@ -118,14 +224,14 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
 
     HMAP_FOR_EACH (op, key_node, ls_ports) {
         if (lsp_is_enabled(op->nbsp)) {
-            ovn_multicast_add(mcast_groups, &mc_flood, op);
+            ovn_multicast_add(&data->mcast_groups, &mc_flood, op);
 
             if (!lsp_is_router(op->nbsp)) {
-                ovn_multicast_add(mcast_groups, &mc_flood_l2, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_flood_l2, op);
             }
 
             if (op->has_unknown) {
-                ovn_multicast_add(mcast_groups, &mc_unknown, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_unknown, op);
             }
 
             /* If this port is connected to a multicast router then add it
@@ -133,7 +239,7 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
              */
             if (op->od->mcast_info.sw.flood_relay && op->peer &&
                 op->peer->od && op->peer->od->mcast_info.rtr.relay) {
-                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_mrouter_flood, op);
             }
 
             /* If this port is configured to always flood multicast reports
@@ -141,7 +247,7 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
              * flooded to statically configured or learned mrouters).
              */
             if (op->mcast_info.flood_reports) {
-                ovn_multicast_add(mcast_groups, &mc_mrouter_flood, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_mrouter_flood, op);
                 op->od->mcast_info.sw.flood_reports = true;
             }
 
@@ -149,7 +255,7 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
              * add it to the MC_STATIC group.
              */
             if (op->mcast_info.flood) {
-                ovn_multicast_add(mcast_groups, &mc_static, op);
+                ovn_multicast_add(&data->mcast_groups, &mc_static, op);
                 op->od->mcast_info.sw.flood_static = true;
             }
         }
@@ -202,7 +308,8 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
          * if the multicast group already exists.
          */
         struct ovn_igmp_group *igmp_group =
-            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp, igmp_groups, od,
+            ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
+                               &data->igmp_groups, od,
                                &group_address, sb_igmp->address);
 
         /* Add the extracted ports to the IGMP group. */
@@ -240,7 +347,7 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
 
             struct ovn_igmp_group *igmp_group_rtr =
                 ovn_igmp_group_add(sbrec_mcast_group_by_name_dp,
-                                   igmp_groups, router_port->od,
+                                   &data->igmp_groups, router_port->od,
                                    &group_address, igmp_group->mcgroup.name);
             struct ovn_port **router_igmp_ports =
                 xmalloc(sizeof *router_igmp_ports);
@@ -260,41 +367,41 @@  build_mcast_groups(const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
      * explicitly.
      */
     struct ovn_igmp_group *igmp_group;
-    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
+    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, &data->igmp_groups) {
 
         /* If this is a mrouter entry just aggregate the mrouter ports
          * into the MC_MROUTER mcast_group and destroy the igmp_group;
          * no more processing needed. */
         if (!strcmp(igmp_group->mcgroup.name, OVN_IGMP_GROUP_MROUTERS)) {
-            ovn_igmp_mrouter_aggregate_ports(igmp_group, mcast_groups);
-            ovn_igmp_group_destroy(igmp_groups, igmp_group);
+            ovn_igmp_mrouter_aggregate_ports(igmp_group, &data->mcast_groups);
+            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
             continue;
         }
 
         if (!ovn_igmp_group_allocate_id(igmp_group)) {
             /* If we ran out of keys just destroy the entry. */
-            ovn_igmp_group_destroy(igmp_groups, igmp_group);
+            ovn_igmp_group_destroy(&data->igmp_groups, igmp_group);
             continue;
         }
 
         /* Aggregate the ports from all entries corresponding to this
          * group.
          */
-        ovn_igmp_group_aggregate_ports(igmp_group, mcast_groups);
+        ovn_igmp_group_aggregate_ports(igmp_group, &data->mcast_groups);
     }
 }
 
-void
+static void
 sync_multicast_groups_to_sb(
-    struct ovsdb_idl_txn *ovnsb_txn,
+    struct multicast_igmp_data *data, struct ovsdb_idl_txn *ovnsb_txn,
     const struct sbrec_multicast_group_table *sbrec_multicast_group_table,
-    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
-    struct hmap *mcast_groups)
+    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths)
 {
+    struct hmapx mcast_in_sb = HMAPX_INITIALIZER(&mcast_in_sb);
+
     /* Push changes to the Multicast_Group table to database. */
     const struct sbrec_multicast_group *sbmc;
-    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH_SAFE (
-        sbmc, sbrec_multicast_group_table) {
+    SBREC_MULTICAST_GROUP_TABLE_FOR_EACH (sbmc, sbrec_multicast_group_table) {
         struct ovn_datapath *od = ovn_datapath_from_sbrec(ls_datapaths,
                                                           lr_datapaths,
                                                           sbmc->datapath);
@@ -306,55 +413,34 @@  sync_multicast_groups_to_sb(
 
         struct multicast_group group = { .name = sbmc->name,
             .key = sbmc->tunnel_key };
-        struct ovn_multicast *mc = ovn_multicast_find(mcast_groups,
+        struct ovn_multicast *mc = ovn_multicast_find(&data->mcast_groups,
                                                       od, &group);
         if (mc) {
             ovn_multicast_update_sbrec(mc, sbmc);
-            ovn_multicast_destroy(mcast_groups, mc);
+            hmapx_add(&mcast_in_sb, mc);
         } else {
             sbrec_multicast_group_delete(sbmc);
         }
     }
     struct ovn_multicast *mc;
-    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
+    HMAP_FOR_EACH_SAFE (mc, hmap_node, &data->mcast_groups) {
         if (!mc->datapath) {
-            ovn_multicast_destroy(mcast_groups, mc);
+            ovn_multicast_destroy(&data->mcast_groups, mc);
+            continue;
+        }
+
+        if (hmapx_contains(&mcast_in_sb, mc)) {
             continue;
         }
+
         sbmc = create_sb_multicast_group(ovnsb_txn, mc->datapath->sb,
                                          mc->group->name, mc->group->key);
         ovn_multicast_update_sbrec(mc, sbmc);
-        ovn_multicast_destroy(mcast_groups, mc);
     }
 
-    hmap_destroy(mcast_groups);
-}
-
-void
-ovn_igmp_groups_destroy(struct hmap *igmp_groups)
-{
-    struct ovn_igmp_group *igmp_group;
-    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
-        ovn_igmp_group_destroy(igmp_groups, igmp_group);
-    }
-    hmap_destroy(igmp_groups);
+    hmapx_destroy(&mcast_in_sb);
 }
 
-struct sbrec_multicast_group *
-create_sb_multicast_group(struct ovsdb_idl_txn *ovnsb_txn,
-                          const struct sbrec_datapath_binding *dp,
-                          const char *name,
-                          int64_t tunnel_key)
-{
-    struct sbrec_multicast_group *sbmc =
-        sbrec_multicast_group_insert(ovnsb_txn);
-    sbrec_multicast_group_set_datapath(sbmc, dp);
-    sbrec_multicast_group_set_name(sbmc, name);
-    sbrec_multicast_group_set_tunnel_key(sbmc, tunnel_key);
-    return sbmc;
-}
-
-
 static bool
 multicast_group_equal(const struct multicast_group *a,
                       const struct multicast_group *b)
@@ -362,7 +448,6 @@  multicast_group_equal(const struct multicast_group *a,
     return !strcmp(a->name, b->name) && a->key == b->key;
 }
 
-
 static uint32_t
 ovn_multicast_hash(const struct ovn_datapath *datapath,
                    const struct multicast_group *group)
@@ -452,6 +537,15 @@  ovn_multicast_update_sbrec(const struct ovn_multicast *mc,
     free(ports);
 }
 
+static void
+ovn_multicast_groups_destroy(struct hmap *mcast_groups)
+{
+    struct ovn_multicast *mc;
+    HMAP_FOR_EACH_SAFE (mc, hmap_node, mcast_groups) {
+        ovn_multicast_destroy(mcast_groups, mc);
+    }
+}
+
 static uint32_t
 ovn_igmp_group_hash(const struct ovn_datapath *datapath,
                     const struct in6_addr *address)
@@ -644,3 +738,12 @@  ovn_igmp_group_destroy(struct hmap *igmp_groups,
         free(igmp_group);
     }
 }
+
+static void
+ovn_igmp_groups_destroy(struct hmap *igmp_groups)
+{
+    struct ovn_igmp_group *igmp_group;
+    HMAP_FOR_EACH_SAFE (igmp_group, hmap_node, igmp_groups) {
+        ovn_igmp_group_destroy(igmp_groups, igmp_group);
+    }
+}
diff --git a/northd/en-multicast.h b/northd/en-multicast.h
index 9a6848f78..3932b4b08 100644
--- a/northd/en-multicast.h
+++ b/northd/en-multicast.h
@@ -70,20 +70,16 @@  struct ovn_igmp_group {
     struct ovs_list entries; /* List of SB entries for this group. */
 };
 
-void build_mcast_groups(
-    const struct sbrec_igmp_group_table *sbrec_igmp_group_table,
-    struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp,
-    const struct hmap *ls_datapaths,
-    const struct hmap *ls_ports,
-    const struct hmap *lr_ports,
-    struct hmap *mcast_groups,
-    struct hmap *igmp_groups);
-void sync_multicast_groups_to_sb(
-    struct ovsdb_idl_txn *ovnsb_txn,
-    const struct sbrec_multicast_group_table *sbrec_multicast_group_table,
-    const struct hmap * ls_datapaths, const struct hmap *lr_datapaths,
-    struct hmap *mcast_groups);
-void ovn_igmp_groups_destroy(struct hmap *igmp_groups);
+struct multicast_igmp_data {
+    struct hmap mcast_groups;
+    struct hmap igmp_groups;
+    struct lflow_ref *lflow_ref;
+};
+
+void *en_multicast_igmp_init(struct engine_node *,struct engine_arg *);
+void en_multicast_igmp_run(struct engine_node *, void *);
+bool multicast_igmp_northd_handler(struct engine_node *, void *);
+void en_multicast_igmp_cleanup(void *);
 struct sbrec_multicast_group *create_sb_multicast_group(
     struct ovsdb_idl_txn *ovnsb_txn, const struct sbrec_datapath_binding *,
     const char *name, int64_t tunnel_key);
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index 6e0aa04c4..a9990974c 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -34,6 +34,7 @@ 
 #include "en-lr-stateful.h"
 #include "en-lr-nat.h"
 #include "en-ls-stateful.h"
+#include "en-multicast.h"
 #include "en-northd.h"
 #include "en-lflow.h"
 #include "en-northd-output.h"
@@ -161,6 +162,7 @@  static ENGINE_NODE(route_policies, "route_policies");
 static ENGINE_NODE(routes, "routes");
 static ENGINE_NODE(bfd, "bfd");
 static ENGINE_NODE(bfd_sync, "bfd_sync");
+static ENGINE_NODE(multicast_igmp, "multicast_igmp");
 
 void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                           struct ovsdb_idl_loop *sb)
@@ -267,11 +269,15 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_sync_meters, &en_nb_meter, NULL);
     engine_add_input(&en_sync_meters, &en_sb_meter, NULL);
 
+    engine_add_input(&en_multicast_igmp, &en_northd,
+                     multicast_igmp_northd_handler);
+    engine_add_input(&en_multicast_igmp, &en_sb_multicast_group, NULL);
+    engine_add_input(&en_multicast_igmp, &en_sb_igmp_group, NULL);
+
     engine_add_input(&en_lflow, &en_nb_acl, NULL);
     engine_add_input(&en_lflow, &en_sync_meters, NULL);
     engine_add_input(&en_lflow, &en_sb_logical_flow, NULL);
     engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
-    engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
     engine_add_input(&en_lflow, &en_sb_logical_dp_group, NULL);
     engine_add_input(&en_lflow, &en_bfd_sync, NULL);
     engine_add_input(&en_lflow, &en_route_policies, NULL);
@@ -285,6 +291,8 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
     engine_add_input(&en_lflow, &en_lr_stateful, lflow_lr_stateful_handler);
     engine_add_input(&en_lflow, &en_ls_stateful, lflow_ls_stateful_handler);
+    engine_add_input(&en_lflow, &en_multicast_igmp,
+                     lflow_multicast_igmp_handler);
 
     engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
     engine_add_input(&en_sync_to_sb_addr_set, &en_lr_stateful, NULL);
diff --git a/northd/northd.c b/northd/northd.c
index 905f19ff1..4172f3396 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -16980,7 +16980,6 @@  struct lswitch_flow_build_info {
     const struct lr_stateful_table *lr_stateful_table;
     const struct ls_stateful_table *ls_stateful_table;
     struct lflow_table *lflows;
-    struct hmap *igmp_groups;
     const struct shash *meter_groups;
     const struct hmap *lb_dps_map;
     const struct hmap *svc_monitor_map;
@@ -17145,7 +17144,6 @@  build_lflows_thread(void *arg)
     const struct lr_stateful_record *lr_stateful_rec;
     const struct ls_stateful_record *ls_stateful_rec;
     struct lswitch_flow_build_info *lsi;
-    struct ovn_igmp_group *igmp_group;
     struct ovn_lb_datapaths *lb_dps;
     struct ovn_datapath *od;
     struct ovn_port *op;
@@ -17296,27 +17294,6 @@  build_lflows_thread(void *arg)
                 }
             }
 
-            for (bnum = control->id;
-                    bnum <= lsi->igmp_groups->mask;
-                    bnum += control->pool->size)
-            {
-                HMAP_FOR_EACH_IN_PARALLEL (
-                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
-                    if (stop_parallel_processing()) {
-                        return NULL;
-                    }
-                    if (igmp_group->datapath->nbs) {
-                        build_lswitch_ip_mcast_igmp_mld(igmp_group,
-                                                        lsi->lflows,
-                                                        &lsi->actions,
-                                                        &lsi->match, NULL);
-                    } else {
-                        build_igmp_flows_for_lrouter(igmp_group, lsi->lflows,
-                                                     &lsi->actions,
-                                                     &lsi->match, NULL);
-                    }
-                }
-            }
             lsi->thread_lflow_counter = thread_lflow_counter;
         }
         post_completed_work(control);
@@ -17366,7 +17343,6 @@  build_lswitch_and_lrouter_flows(
     const struct lr_stateful_table *lr_stateful_table,
     const struct ls_stateful_table *ls_stateful_table,
     struct lflow_table *lflows,
-    struct hmap *igmp_groups,
     const struct shash *meter_groups,
     const struct hmap *lb_dps_map,
     const struct hmap *svc_monitor_map,
@@ -17401,7 +17377,6 @@  build_lswitch_and_lrouter_flows(
             lsiv[index].ls_port_groups = ls_pgs;
             lsiv[index].lr_stateful_table = lr_stateful_table;
             lsiv[index].ls_stateful_table = ls_stateful_table;
-            lsiv[index].igmp_groups = igmp_groups;
             lsiv[index].meter_groups = meter_groups;
             lsiv[index].lb_dps_map = lb_dps_map;
             lsiv[index].svc_monitor_map = svc_monitor_map;
@@ -17432,7 +17407,6 @@  build_lswitch_and_lrouter_flows(
     } else {
         const struct lr_stateful_record *lr_stateful_rec;
         const struct ls_stateful_record *ls_stateful_rec;
-        struct ovn_igmp_group *igmp_group;
         struct ovn_lb_datapaths *lb_dps;
         struct ovn_datapath *od;
         struct ovn_port *op;
@@ -17446,7 +17420,6 @@  build_lswitch_and_lrouter_flows(
             .lr_stateful_table = lr_stateful_table,
             .ls_stateful_table = ls_stateful_table,
             .lflows = lflows,
-            .igmp_groups = igmp_groups,
             .meter_groups = meter_groups,
             .lb_dps_map = lb_dps_map,
             .svc_monitor_map = svc_monitor_map,
@@ -17535,19 +17508,6 @@  build_lswitch_and_lrouter_flows(
                                     lsi.features,
                                     lsi.lflows);
         }
-        stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
-        stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
-        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
-            if (igmp_group->datapath->nbs) {
-                build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi.lflows,
-                                                &lsi.actions, &lsi.match,
-                                                NULL);
-            } else {
-                build_igmp_flows_for_lrouter(igmp_group, lsi.lflows,
-                                             &lsi.actions, &lsi.match, NULL);
-            }
-        }
-        stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
 
         ds_destroy(&lsi.match);
         ds_destroy(&lsi.actions);
@@ -17556,6 +17516,40 @@  build_lswitch_and_lrouter_flows(
     free(svc_check_match);
 }
 
+/* The IGMP flows have to be built in main thread because there is
+ * single lflow_ref for all of them which isn't thread safe.
+ * This shouldn't affect performance as there is a limited how many
+ * IGMP groups can be created. */
+void
+build_igmp_lflows(struct hmap *igmp_groups, const struct hmap *ls_datapaths,
+                  struct lflow_table *lflows, struct lflow_ref *lflow_ref)
+{
+    struct ds actions = DS_EMPTY_INITIALIZER;
+    struct ds match = DS_EMPTY_INITIALIZER;
+
+    struct ovn_datapath *od;
+    HMAP_FOR_EACH (od, key_node, ls_datapaths) {
+        init_mcast_flow_count(od);
+        build_mcast_flood_lswitch(od, lflows, &actions, lflow_ref);
+    }
+
+    stopwatch_start(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
+    struct ovn_igmp_group *igmp_group;
+    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
+        if (igmp_group->datapath->nbs) {
+            build_lswitch_ip_mcast_igmp_mld(igmp_group, lflows, &actions,
+                                            &match, lflow_ref);
+        } else {
+            build_igmp_flows_for_lrouter(igmp_group, lflows, &actions,
+                                         &match, lflow_ref);
+        }
+    }
+    stopwatch_stop(LFLOWS_IGMP_STOPWATCH_NAME, time_msec());
+
+    ds_destroy(&actions);
+    ds_destroy(&match);
+}
+
 void run_update_worker_pool(int n_threads)
 {
     /* If number of threads has been updated (or initially set),
@@ -17580,20 +17574,6 @@  void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
                   struct lflow_input *input_data,
                   struct lflow_table *lflows)
 {
-    struct hmap mcast_groups;
-    struct hmap igmp_groups;
-
-    struct ovn_datapath *od;
-    HMAP_FOR_EACH (od, key_node, &input_data->ls_datapaths->datapaths) {
-        init_mcast_flow_count(od);
-    }
-
-    build_mcast_groups(input_data->sbrec_igmp_group_table,
-                       input_data->sbrec_mcast_group_by_name_dp,
-                       &input_data->ls_datapaths->datapaths,
-                       input_data->ls_ports, input_data->lr_ports,
-                       &mcast_groups, &igmp_groups);
-
     build_lswitch_and_lrouter_flows(input_data->ls_datapaths,
                                     input_data->lr_datapaths,
                                     input_data->ls_ports,
@@ -17602,7 +17582,6 @@  void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
                                     input_data->lr_stateful_table,
                                     input_data->ls_stateful_table,
                                     lflows,
-                                    &igmp_groups,
                                     input_data->meter_groups,
                                     input_data->lb_datapaths_map,
                                     input_data->svc_monitor_map,
@@ -17613,6 +17592,9 @@  void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
                                     input_data->parsed_routes,
                                     input_data->route_policies,
                                     input_data->route_tables);
+    build_igmp_lflows(input_data->igmp_groups,
+                      &input_data->ls_datapaths->datapaths,
+                      lflows, input_data->igmp_lflow_ref);
 
     if (parallelization_state == STATE_INIT_HASH_SIZES) {
         parallelization_state = STATE_USE_PARALLELIZATION;
@@ -17630,13 +17612,6 @@  void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
                            input_data->sbrec_logical_dp_group_table);
 
     stopwatch_stop(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
-
-    sync_multicast_groups_to_sb(ovnsb_txn,
-                                input_data->sbrec_multicast_group_table,
-                                &input_data->ls_datapaths->datapaths,
-                                &input_data->lr_datapaths->datapaths,
-                                &mcast_groups);
-    ovn_igmp_groups_destroy(&igmp_groups);
 }
 
 void
diff --git a/northd/northd.h b/northd/northd.h
index 218f0f62d..6bca6bb1a 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -198,13 +198,12 @@  struct bfd_sync_data {
     struct sset bfd_ports;
 };
 
+struct lflow_ref;
 struct lr_nat_table;
 
 struct lflow_input {
     /* Southbound table references */
     const struct sbrec_logical_flow_table *sbrec_logical_flow_table;
-    const struct sbrec_multicast_group_table *sbrec_multicast_group_table;
-    const struct sbrec_igmp_group_table *sbrec_igmp_group_table;
     const struct sbrec_logical_dp_group_table *sbrec_logical_dp_group_table;
 
     /* Indexes */
@@ -228,6 +227,8 @@  struct lflow_input {
     struct hmap *parsed_routes;
     struct hmap *route_policies;
     struct simap *route_tables;
+    struct hmap *igmp_groups;
+    struct lflow_ref *igmp_lflow_ref;
 };
 
 extern int parallelization_state;
@@ -896,5 +897,8 @@  lsp_is_router(const struct nbrec_logical_switch_port *nbsp)
 }
 
 struct ovn_port *ovn_port_find(const struct hmap *ports, const char *name);
-
+void build_igmp_lflows(struct hmap *igmp_groups,
+                       const struct hmap *ls_datapaths,
+                       struct lflow_table *lflows,
+                       struct lflow_ref *lflow_ref);
 #endif /* NORTHD_H */
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 507cc302f..91ba5b736 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -14384,3 +14384,92 @@  AT_CHECK([ovn-sbctl lflow-list S1 | grep ls_out_acl_action | grep priority=500 |
 
 AT_CLEANUP
 ])
+
+OVN_FOR_EACH_NORTHD_NO_HV([
+AT_SETUP([IGMP incremental processing])
+
+check_recompute_counter() {
+    lflow_recomp=$(as northd ovn-appctl -t ovn-northd inc-engine/show-stats lflow recompute)
+    AT_CHECK([test x$lflow_recomp = x$1])
+}
+ovn_start
+
+net_add n1
+sim_add hv1
+as hv1
+
+ovs-vsctl add-br br-phys
+ovn_attach n1 br-phys 192.168.0.11
+
+sim_add hv2
+as hv2
+
+check ovs-vsctl add-br br-phys
+ovn_attach n1 br-phys 192.168.0.2
+
+check ovn-nbctl ls-add sw1
+check ovn-nbctl ls-add sw2
+
+check ovn-nbctl lsp-add sw1 sw1-p11
+check ovn-nbctl lsp-add sw2 sw2-p21
+
+check ovn-nbctl lr-add rtr
+check ovn-nbctl lrp-add rtr rtr-sw1 00:00:00:00:01:00 10.0.0.254/24
+check ovn-nbctl lrp-add rtr rtr-sw2 00:00:00:00:02:00 10.0.0.254/24
+
+
+check ovn-nbctl lsp-add sw1 sw1-rtr \
+    -- lsp-set-type sw1-rtr router  \
+    -- lsp-set-addresses sw1-rtr 00:00:00:00:01:00 \
+    -- lsp-set-options sw1-rtr router-port=rtr-sw1
+
+check ovn-nbctl lsp-add sw2 sw2-rtr \
+    -- lsp-set-type sw2-rtr router  \
+    -- lsp-set-addresses sw1-rtr 00:00:00:00:02:00 \
+    -- lsp-set-options sw2-rtr router-port=rtr-sw2
+
+check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
+
+# Create IGMP_Group 239.0.1.68 with port sw1-p11
+ovn-sbctl create IGMP_Group address=239.0.1.68 \
+    datapath=$(fetch_column Datapath_Binding _uuid external_ids:name=sw1) \
+    chassis=$(fetch_column Chassis _uuid name=hv1) \
+    chassis_name=hv1 \
+    ports=$(fetch_column Port_Binding _uuid logical_port=sw1-p11)
+igmp_uuid=$(fetch_column IGMP_GROUP _uuid address=239.0.1.68)
+
+check ovn-nbctl --wait=sb sync
+wait_row_count Igmp_Group 1 address=239.0.1.68
+wait_row_count Multicast_Group 1 name="239.0.1.68"
+wait_row_count Multicast_Group  1 name="239.0.1.68" ports='[['$(fetch_column Port_Binding _uuid logical_port=sw1-p11)']]'
+ovn-sbctl list igmp_group
+check_recompute_counter 0
+CHECK_NO_CHANGE_AFTER_RECOMPUTE
+
+check ovn-nbctl set logical_router rtr \
+    options:mcast_relay="true"
+check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
+# Update IGMP_Group 239.0.1.68 to include sw2-p21
+ovn-sbctl add IGMP_Group $igmp_uuid ports $(fetch_column Port_Binding _uuid logical_port=sw2-p21)
+
+check ovn-nbctl --wait=sb sync
+wait_row_count IGMP_Group 1 address=239.0.1.68
+
+# Check that new Multicast_Group is created
+wait_row_count Multicast_Group 2 name=239.0.1.68
+check_recompute_counter 0
+CHECK_NO_CHANGE_AFTER_RECOMPUTE
+
+check as northd ovn-appctl -t ovn-northd inc-engine/clear-stats
+# Delete IGMP_Group 239.0.1.68
+ovn-sbctl destroy IGMP_Group $igmp_uuid
+check ovn-nbctl --wait=sb sync
+check_recompute_counter 0
+CHECK_NO_CHANGE_AFTER_RECOMPUTE
+
+wait_row_count IGMP_Group 0 address=239.0.1.68
+wait_row_count Multicast_Group 0 name=239.0.1.68
+
+OVN_CLEANUP([hv1], [hv2])
+AT_CLEANUP
+])