diff mbox series

[ovs-dev,v7,9/9] northd: Allow flow simplification for ACL sampling.

Message ID 20240807065126.38132-10-dceara@redhat.com
State Accepted
Headers show
Series Add ACL Sampling using per-flow IPFIX. | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot-_Build_and_Test fail github build: failed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Dumitru Ceara Aug. 7, 2024, 6:51 a.m. UTC
From: Ales Musil <amusil@redhat.com>

Currently, OVN would generate up to 2 flows per sample, depending
on the configuration. Add optimization that can reduce the number
of flows added into the ACL pipeline down to 3 per collector. This
optimization can be achieved only when the sample action with
registers is supported in OvS and the sample has only single
collector. The single collector per sample should be the case
in most configurations, usually even the same collector
for all samples which greatly reduces the number of flows per
ACL with sampling.

If there are more collectors per sample or the OvS feature is not
supported, the implementation will fall back to flows per sample.

Reported-at: https://issues.redhat.com/browse/FDP-709
Signed-off-by: Ales Musil <amusil@redhat.com>
---
V7:
- Addressed Nadia's comment:
  - Increased number of ct mark bits used for storing the collector id
    to 8.
- Addressed Mark's comment:
  - cleaned up conditional match build.
V6:
- Rebased.
- Removed Dumitru's ack.
- Store (newly created) Sample_Collector.id in ct state - instead of the
  actual set-id to avoid ambiguity when multiple probabilities are used
  with the same collector set id.
- Fixed bug with stateful to-lport ACLs on router ports.
- Reduced number of ct mark bits used for storing the collector id to 4.
V5:
- Address Ilya's comments:
  - Explicitly set acl_observation_stage enum values.
- Added Dumitru's ack
---
 include/ovn/logical-fields.h |   2 +
 lib/logical-fields.c         |   8 +
 northd/northd.c              | 273 ++++++++++++++++++------
 tests/ovn-northd.at          | 388 +++++++++++++++++++++++++++++------
 tests/ovn.at                 |   2 +
 tests/system-ovn.at          |  10 +-
 6 files changed, 553 insertions(+), 130 deletions(-)

Comments

Mark Michelson Aug. 7, 2024, 5 p.m. UTC | #1
Thanks for all the updates Ales and Dumitru,

Acked-by: Mark Michelson <mmichels@redhat.com>

This means I have acked all patches in the series now.

On 8/7/24 02:51, Dumitru Ceara wrote:
> From: Ales Musil <amusil@redhat.com>
> 
> Currently, OVN would generate up to 2 flows per sample, depending
> on the configuration. Add optimization that can reduce the number
> of flows added into the ACL pipeline down to 3 per collector. This
> optimization can be achieved only when the sample action with
> registers is supported in OvS and the sample has only single
> collector. The single collector per sample should be the case
> in most configurations, usually even the same collector
> for all samples which greatly reduces the number of flows per
> ACL with sampling.
> 
> If there are more collectors per sample or the OvS feature is not
> supported, the implementation will fall back to flows per sample.
> 
> Reported-at: https://issues.redhat.com/browse/FDP-709
> Signed-off-by: Ales Musil <amusil@redhat.com>
> ---
> V7:
> - Addressed Nadia's comment:
>    - Increased number of ct mark bits used for storing the collector id
>      to 8.
> - Addressed Mark's comment:
>    - cleaned up conditional match build.
> V6:
> - Rebased.
> - Removed Dumitru's ack.
> - Store (newly created) Sample_Collector.id in ct state - instead of the
>    actual set-id to avoid ambiguity when multiple probabilities are used
>    with the same collector set id.
> - Fixed bug with stateful to-lport ACLs on router ports.
> - Reduced number of ct mark bits used for storing the collector id to 4.
> V5:
> - Address Ilya's comments:
>    - Explicitly set acl_observation_stage enum values.
> - Added Dumitru's ack
> ---
>   include/ovn/logical-fields.h |   2 +
>   lib/logical-fields.c         |   8 +
>   northd/northd.c              | 273 ++++++++++++++++++------
>   tests/ovn-northd.at          | 388 +++++++++++++++++++++++++++++------
>   tests/ovn.at                 |   2 +
>   tests/system-ovn.at          |  10 +-
>   6 files changed, 553 insertions(+), 130 deletions(-)
> 
> diff --git a/include/ovn/logical-fields.h b/include/ovn/logical-fields.h
> index ce79b501cf..d6c4a9b6b3 100644
> --- a/include/ovn/logical-fields.h
> +++ b/include/ovn/logical-fields.h
> @@ -197,6 +197,8 @@ const struct ovn_field *ovn_field_from_name(const char *name);
>   #define OVN_CT_NATTED_BIT  1
>   #define OVN_CT_LB_SKIP_SNAT_BIT 2
>   #define OVN_CT_LB_FORCE_SNAT_BIT 3
> +#define OVN_CT_OBS_STAGE_1ST_BIT 4
> +#define OVN_CT_OBS_STAGE_END_BIT 5
>   
>   #define OVN_CT_BLOCKED 1
>   #define OVN_CT_NATTED  2
> diff --git a/lib/logical-fields.c b/lib/logical-fields.c
> index 0c187e1c84..134d2674fd 100644
> --- a/lib/logical-fields.c
> +++ b/lib/logical-fields.c
> @@ -165,6 +165,14 @@ ovn_init_symtab(struct shash *symtab)
>                                       OVN_CT_STR(OVN_CT_LB_FORCE_SNAT_BIT)
>                                       "]",
>                                       WR_CT_COMMIT);
> +    expr_symtab_add_subfield_scoped(symtab, "ct_mark.obs_stage", NULL,
> +                                    "ct_mark["
> +                                    OVN_CT_STR(OVN_CT_OBS_STAGE_1ST_BIT) ".."
> +                                    OVN_CT_STR(OVN_CT_OBS_STAGE_END_BIT)
> +                                    "]",
> +                                    WR_CT_COMMIT);
> +    expr_symtab_add_subfield_scoped(symtab, "ct_mark.obs_collector_id", NULL,
> +                                    "ct_mark[16..23]", WR_CT_COMMIT);
>   
>       expr_symtab_add_field_scoped(symtab, "ct_label", MFF_CT_LABEL, NULL,
>                                    false, WR_CT_COMMIT);
> diff --git a/northd/northd.c b/northd/northd.c
> index 13f9faba31..70b13c8c98 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -144,8 +144,20 @@ static bool vxlan_mode;
>   #define REGBIT_ACL_VERDICT_ALLOW "reg8[16]"
>   #define REGBIT_ACL_VERDICT_DROP "reg8[17]"
>   #define REGBIT_ACL_VERDICT_REJECT "reg8[18]"
> +#define REGBIT_ACL_OBS_STAGE "reg8[19..20]"
>   #define REG_ACL_TIER "reg8[30..31]"
>   
> +enum acl_observation_stage {
> +    ACL_OBS_FROM_LPORT          = 0,
> +    ACL_OBS_FROM_LPORT_AFTER_LB = 1,
> +    ACL_OBS_TO_LPORT            = 2,
> +    ACL_OBS_STAGE_MAX
> +};
> +
> +/* enum acl_observation_stage_t values must fit in the 2 bits of
> + * REGBIT_ACL_OBS_STAGE .*/
> +BUILD_ASSERT_DECL(ACL_OBS_STAGE_MAX < (1 << 2));
> +
>   /* Indicate that this packet has been recirculated using egress
>    * loopback.  This allows certain checks to be bypassed, such as a
>    * logical router dropping packets with source IP address equals
> @@ -189,6 +201,8 @@ static bool vxlan_mode;
>    * domain and point ID. */
>   #define REG_OBS_POINT_ID_NEW "reg3"
>   #define REG_OBS_POINT_ID_EST "reg9"
> +#define REG_OBS_COLLECTOR_ID_NEW "reg8[0..7]"
> +#define REG_OBS_COLLECTOR_ID_EST "reg8[8..15]"
>   
>   /* Register used for temporarily store ECMP eth.src to avoid masked ct_label
>    * access. It doesn't really occupy registers because the content of the
> @@ -228,12 +242,13 @@ static bool vxlan_mode;
>    * +----+----------------------------------------------+ G |                                   |
>    * | R7 |                   UNUSED                     | 1 |                                   |
>    * +----+----------------------------------------------+---+-----------------------------------+
> - * |    |              LB_AFF_MATCH_PORT               |
> - * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  |
> - * +----+----------------------------------------------+
> - * | R9 |              OBS_POINT_ID_EST                |
> - * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |
> - * +----+----------------------------------------------+
> + * | R8 |              LB_AFF_MATCH_PORT               | X |      REG_OBS_COLLECTOR_ID_NEW     |
> + * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  | R |      REG_OBS_COLLECTOR_ID_EST     |
> + * |    |                                              | E |  (>= ACL_EVAL* && <= ACL_ACTION*) |
> + * +----+----------------------------------------------+ G +-----------------------------------+
> + * | R9 |              OBS_POINT_ID_EST                | 4 |                                   |
> + * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |   |                                   |
> + * +----+----------------------------------------------+---+-----------------------------------+
>    *
>    * Logical Router pipeline:
>    * +-----+---------------------------+---+-----------------+---+------------------------------------+
> @@ -6532,7 +6547,8 @@ build_acl_sample_action(struct ds *actions, const struct nbrec_acl *acl,
>   static void
>   build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
>                                 const struct nbrec_sample *sample_new,
> -                              const struct nbrec_sample *sample_est)
> +                              const struct nbrec_sample *sample_est,
> +                              enum acl_observation_stage obs_stage)
>   {
>       if (!acl->label && !sample_new && !sample_est) {
>           return;
> @@ -6540,6 +6556,8 @@ build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
>   
>       uint32_t point_id_new = 0;
>       uint32_t point_id_est = 0;
> +    uint8_t collector_id_new = 0;
> +    uint8_t collector_id_est = 0;
>   
>       if (acl->label) {
>           point_id_new = acl->label;
> @@ -6547,16 +6565,27 @@ build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
>       } else {
>           if (sample_new) {
>               point_id_new = sample_new->metadata;
> +            if (sample_new->n_collectors == 1) {
> +                collector_id_new = sample_new->collectors[0]->id;
> +            }
>           }
>           if (sample_est) {
>               point_id_est = sample_est->metadata;
> +            if (sample_est->n_collectors == 1) {
> +                collector_id_est = sample_est->collectors[0]->id;
> +            }
>           }
>       }
>   
>       ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
>                              REG_OBS_POINT_ID_NEW " = %"PRIu32"; "
> -                           REG_OBS_POINT_ID_EST " = %"PRIu32"; ",
> -                  point_id_new, point_id_est);
> +                           REG_OBS_POINT_ID_EST " = %"PRIu32"; "
> +                           REG_OBS_COLLECTOR_ID_NEW " = %"PRIu8"; "
> +                           REG_OBS_COLLECTOR_ID_EST " = %"PRIu8"; "
> +                           REGBIT_ACL_OBS_STAGE " = %"PRIu8"; ",
> +                  point_id_new, point_id_est,
> +                  collector_id_new, collector_id_est,
> +                  (uint8_t) obs_stage);
>   }
>   
>   /* This builds an ACL logical flow specific match that selects traffic
> @@ -6604,46 +6633,16 @@ build_acl_sample_label_match(struct ds *match, const struct nbrec_acl *acl,
>   }
>   
>   /* This builds a logical flow that samples and forwards/drops traffic
> - * that hit a stateless ACL ("pass" or "allow-stateless") that has sampling
> - * enabled.
> + * that hit a stateless/stateful ACL that has sampling enabled.
>    */
>   static void
> -build_acl_sample_new_stateless_flows(const struct ovn_datapath *od,
> -                                     struct lflow_table *lflows,
> -                                     enum ovn_stage stage,
> -                                     struct ds *match, struct ds *actions,
> -                                     const struct nbrec_acl *acl,
> -                                     uint8_t sample_domain_id,
> -                                     struct lflow_ref *lflow_ref)
> -{
> -    if (!acl->sample_new) {
> -        return;
> -    }
> -
> -    ds_clear(actions);
> -    ds_clear(match);
> -
> -    ds_put_cstr(match, "ip && ");
> -    build_acl_sample_register_match(match, acl, acl->sample_new);
> -
> -    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
> -
> -    ovn_lflow_add(lflows, od, stage, 1100, ds_cstr(match),
> -                  ds_cstr(actions), lflow_ref);
> -}
> -
> -/* This builds a logical flow that samples and forwards/drops traffic
> - * that created a new conntrack entry and hit a stateful ACL that has sampling
> - * enabled.
> - */
> -static void
> -build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
> -                                    struct lflow_table *lflows,
> -                                    enum ovn_stage stage,
> -                                    struct ds *match, struct ds *actions,
> -                                    const struct nbrec_acl *acl,
> -                                    uint8_t sample_domain_id,
> -                                    struct lflow_ref *lflow_ref)
> +build_acl_sample_new_flows(const struct ovn_datapath *od,
> +                           struct lflow_table *lflows,
> +                           enum ovn_stage stage,
> +                           struct ds *match, struct ds *actions,
> +                           const struct nbrec_acl *acl,
> +                           uint8_t sample_domain_id, bool stateful,
> +                           struct lflow_ref *lflow_ref)
>   {
>       if (!acl->sample_new) {
>           return;
> @@ -6652,12 +6651,16 @@ build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
>       ds_clear(actions);
>       ds_clear(match);
>   
> -    /* Match on new connections.  However, for to-lport ACLs, due to
> +    /* Match on new connections.  However, for stateful to-lport ACLs, due to
>        * skip_port_from_conntrack() conntrack state might be cleared, so
>        * take that into account too. */
> -    ds_put_format(match, "ip && %s && ",
> -                  stage != S_SWITCH_OUT_ACL_SAMPLE
> -                  ? "ct.new" : "(ct.new || !ct.trk)");
> +    if (!stateful) {
> +        ds_put_format(match, "ip && ");
> +    } else if (stage != S_SWITCH_OUT_ACL_SAMPLE) {
> +        ds_put_format(match, "ip && ct.new && ");
> +    } else {
> +        ds_put_format(match, "ip && (ct.new || !ct.trk) && ");
> +    }
>       build_acl_sample_register_match(match, acl, acl->sample_new);
>   
>       build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
> @@ -6758,6 +6761,112 @@ build_acl_sample_est_stateful_flows(const struct ovn_datapath *od,
>   
>   static void build_acl_reject_action(struct ds *actions, bool is_ingress);
>   
> +/* This builds a generic logical flow that samples traffic
> + * that hit a stateless/stateful ACL that has sampling enabled with
> + * single collector and all chassis supporting the sample with match action.
> + */
> +static void
> +build_acl_sample_generic_new_flows(const struct ovn_datapath *od,
> +                                   struct lflow_table *lflows,
> +                                   enum ovn_stage stage,
> +                                   enum acl_observation_stage obs_stage,
> +                                   struct ds *match, struct ds *actions,
> +                                   const struct nbrec_sample_collector *coll,
> +                                   uint8_t sample_domain_id, bool stateful,
> +                                   struct lflow_ref *lflow_ref)
> +{
> +    ds_clear(match);
> +    ds_clear(actions);
> +
> +    /* Match on new connections.  However, for stateful to-lport ACLs, due to
> +     * skip_port_from_conntrack() conntrack state might be cleared, so
> +     * take that into account too. */
> +    const char *new_conn_match = "ip";
> +    if (stateful) {
> +        if (stage != S_SWITCH_OUT_ACL_SAMPLE) {
> +            new_conn_match = "ip && ct.new";
> +        } else {
> +            new_conn_match = "ip && (ct.new || !ct.trk)";
> +        }
> +    }
> +
> +    ds_put_format(match, "%s && "REG_OBS_COLLECTOR_ID_NEW" == %"PRIu8" && "
> +                         REGBIT_ACL_OBS_STAGE " == %"PRIu8, new_conn_match,
> +                         (uint8_t) coll->id,
> +                         (uint8_t) obs_stage);
> +
> +    ds_put_format(actions, "sample(probability=%"PRIu16","
> +                           "collector_set=%"PRIu8","
> +                           "obs_domain=%"PRIu32","
> +                           "obs_point="REG_OBS_POINT_ID_NEW");"
> +                           " next;",
> +                           (uint16_t) coll->probability,
> +                           (uint8_t) coll->set_id,
> +                           sample_domain_id);
> +
> +    ovn_lflow_add(lflows, od, stage, stateful ? 1000 : 900, ds_cstr(match),
> +                  ds_cstr(actions), lflow_ref);
> +}
> +
> +/* This builds a generic logical flow that samples established traffic
> + * that hit a stateful ACL that has sampling enabled with
> + * single collector and all chassis supporting the sample with match action.
> + */
> +static void
> +build_acl_sample_generic_est_flows(const struct ovn_datapath *od,
> +                                   struct lflow_table *lflows,
> +                                   enum ovn_stage stage,
> +                                   enum acl_observation_stage obs_stage,
> +                                   struct ds *match, struct ds *actions,
> +                                   const struct nbrec_sample_collector *coll,
> +                                   uint8_t sample_domain_id,
> +                                   struct lflow_ref *lflow_ref)
> +{
> +    ds_clear(match);
> +    ds_clear(actions);
> +
> +    ds_put_cstr(match, "ip && ct.trk && (ct.est || ct.rel) && "
> +                       "ct_label.obs_unused == 0 && ");
> +
> +    size_t match_len = match->length;
> +    ds_put_format(match, "!ct.rpl && ct_mark.obs_collector_id == %"PRIu8" && "
> +                         "ct_mark.obs_stage == %"PRIu8,
> +                         (uint8_t) coll->id,
> +                         (uint8_t) obs_stage);
> +
> +    ds_put_format(actions, "sample(probability=%"PRIu16","
> +                           "collector_set=%"PRIu8","
> +                           "obs_domain=%"PRIu32","
> +                           "obs_point=ct_label.obs_point_id);"
> +                           " next;",
> +                           (uint16_t) coll->probability,
> +                           (uint8_t) coll->set_id,
> +                           sample_domain_id);
> +
> +    ovn_lflow_add(lflows, od, stage, 1000, ds_cstr(match),
> +                  ds_cstr(actions), lflow_ref);
> +
> +    enum ovn_stage rpl_stage = (stage == S_SWITCH_OUT_ACL_SAMPLE
> +                                ? S_SWITCH_IN_ACL_SAMPLE
> +                                : S_SWITCH_OUT_ACL_SAMPLE);
> +
> +    ds_truncate(match, match_len);
> +    ds_put_format(match, "ct.rpl && ct_mark.obs_collector_id == %"PRIu8,
> +                  (uint8_t) coll->id);
> +
> +    ovn_lflow_add(lflows, od, rpl_stage, 1000, ds_cstr(match),
> +                  ds_cstr(actions), lflow_ref);
> +}
> +
> +/* Check if the smaple has only single collector and the sample action
> + * with registers is supported. */
> +static bool
> +acl_use_generic_sample_flows(const struct nbrec_sample *sample,
> +                             const struct chassis_features *features)
> +{
> +    return sample && sample->n_collectors == 1 && features->sample_with_reg;
> +}
> +
>   /* This builds all ACL sampling related logical flows:
>    * - for packets creating new connections
>    * - for packets that are part of an existing connection
> @@ -6769,6 +6878,7 @@ build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
>                          const struct nbrec_acl *acl,
>                          struct ds *match, struct ds *actions,
>                          const struct sampling_app_table *sampling_apps,
> +                       const struct chassis_features *features,
>                          struct lflow_ref *lflow_ref)
>   {
>       bool should_sample_established =
> @@ -6792,13 +6902,17 @@ build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
>   
>       bool ingress = !strcmp(acl->direction, "from-lport") ? true : false;
>       enum ovn_stage stage;
> +    enum acl_observation_stage obs_stage;
>   
>       if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
>           stage = S_SWITCH_IN_ACL_AFTER_LB_SAMPLE;
> +        obs_stage = ACL_OBS_FROM_LPORT_AFTER_LB;
>       } else if (ingress) {
>           stage = S_SWITCH_IN_ACL_SAMPLE;
> +        obs_stage = ACL_OBS_FROM_LPORT;
>       } else {
>           stage = S_SWITCH_OUT_ACL_SAMPLE;
> +        obs_stage = ACL_OBS_TO_LPORT;
>       }
>   
>       uint8_t sample_new_domain_id = sampling_app_get_id(sampling_apps,
> @@ -6806,14 +6920,28 @@ build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
>       uint8_t sample_est_domain_id = sampling_app_get_id(sampling_apps,
>                                                          SAMPLING_APP_ACL_EST);
>   
> +    if (acl_use_generic_sample_flows(acl->sample_new, features)) {
> +        build_acl_sample_generic_new_flows(od, lflows, stage, obs_stage,
> +                                           match, actions,
> +                                           acl->sample_new->collectors[0],
> +                                           sample_new_domain_id,
> +                                           stateful_match, lflow_ref);
> +    } else {
> +        build_acl_sample_new_flows(od, lflows, stage, match, actions,
> +                                   acl, sample_new_domain_id, stateful_match,
> +                                   lflow_ref);
> +    }
> +
>       if (!stateful_match) {
> -        build_acl_sample_new_stateless_flows(od, lflows, stage, match, actions,
> -                                             acl, sample_new_domain_id,
> -                                             lflow_ref);
> +        return;
> +    }
> +
> +    if (acl_use_generic_sample_flows(acl->sample_est, features)) {
> +        build_acl_sample_generic_est_flows(od, lflows, stage, obs_stage,
> +                                           match, actions,
> +                                           acl->sample_est->collectors[0],
> +                                           sample_est_domain_id, lflow_ref);
>       } else {
> -        build_acl_sample_new_stateful_flows(od, lflows, stage, match, actions,
> -                                            acl, sample_new_domain_id,
> -                                            lflow_ref);
>           build_acl_sample_est_stateful_flows(od, lflows, stage, match, actions,
>                                               acl, sample_est_domain_id,
>                                               lflow_ref);
> @@ -6845,13 +6973,17 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>   {
>       bool ingress = !strcmp(acl->direction, "from-lport") ? true :false;
>       enum ovn_stage stage;
> +    enum acl_observation_stage obs_stage;
>   
>       if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
>           stage = S_SWITCH_IN_ACL_AFTER_LB_EVAL;
> +        obs_stage = ACL_OBS_FROM_LPORT_AFTER_LB;
>       } else if (ingress) {
>           stage = S_SWITCH_IN_ACL_EVAL;
> +        obs_stage = ACL_OBS_FROM_LPORT;
>       } else {
>           stage = S_SWITCH_OUT_ACL_EVAL;
> +        obs_stage = ACL_OBS_TO_LPORT;
>       }
>   
>       const char *verdict;
> @@ -6885,7 +7017,8 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>           || !strcmp(acl->action, "allow-stateless")) {
>   
>           /* For stateless ACLs just sample "new" packets. */
> -        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
> +                                      obs_stage);
>   
>           ds_put_cstr(actions, "next;");
>           ds_put_format(match, "(%s)", acl->match);
> @@ -6924,7 +7057,7 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>   
>           /* For stateful ACLs sample "new" and "established" packets. */
>           build_acl_sample_label_action(actions, acl, acl->sample_new,
> -                                      acl->sample_est);
> +                                      acl->sample_est, obs_stage);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6948,7 +7081,7 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>   
>           /* For stateful ACLs sample "new" and "established" packets. */
>           build_acl_sample_label_action(actions, acl, acl->sample_new,
> -                                      acl->sample_est);
> +                                      acl->sample_est, obs_stage);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6968,7 +7101,8 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>           ds_truncate(actions, log_verdict_len);
>   
>           /* For drop ACLs just sample all packets as "new" packets. */
> -        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
> +                                      obs_stage);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6991,7 +7125,8 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>           ds_truncate(actions, log_verdict_len);
>   
>           /* For drop ACLs just sample all packets as "new" packets. */
> -        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
> +                                      obs_stage);
>           ds_put_cstr(actions, "ct_commit { ct_mark.blocked = 1; }; next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -7237,6 +7372,7 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>              const struct ls_port_group_table *ls_port_groups,
>              const struct shash *meter_groups,
>              const struct sampling_app_table *sampling_apps,
> +           const struct chassis_features *features,
>              struct lflow_ref *lflow_ref)
>   {
>       const char *default_acl_action = default_acl_drop
> @@ -7429,7 +7565,8 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>                        meter_groups, ls_stateful_rec->max_acl_tier,
>                        &match, &actions, lflow_ref);
>           build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
> -                               &match, &actions, sampling_apps, lflow_ref);
> +                               &match, &actions, sampling_apps,
> +                               features, lflow_ref);
>       }
>   
>       const struct ls_port_group *ls_pg =
> @@ -7448,7 +7585,7 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>                                &match, &actions, lflow_ref);
>                   build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
>                                          &match, &actions, sampling_apps,
> -                                       lflow_ref);
> +                                       features, lflow_ref);
>               }
>           }
>       }
> @@ -8111,6 +8248,8 @@ build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
>       ds_put_cstr(&actions,
>                    "ct_commit { "
>                       "ct_mark.blocked = 0; "
> +                    "ct_mark.obs_stage = " REGBIT_ACL_OBS_STAGE "; "
> +                    "ct_mark.obs_collector_id = " REG_OBS_COLLECTOR_ID_EST "; "
>                       "ct_label.obs_point_id = " REG_OBS_POINT_ID_EST "; "
>                     "}; next;");
>       ovn_lflow_add(lflows, od, S_SWITCH_IN_STATEFUL, 100,
> @@ -16161,6 +16300,7 @@ build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
>                           const struct ls_port_group_table *ls_pgs,
>                           const struct shash *meter_groups,
>                           const struct sampling_app_table *sampling_apps,
> +                        const struct chassis_features *features,
>                           struct lflow_table *lflows)
>   {
>       build_ls_stateful_rec_pre_acls(ls_stateful_rec, od, ls_pgs, lflows,
> @@ -16170,7 +16310,7 @@ build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
>       build_acl_hints(ls_stateful_rec, od, lflows,
>                       ls_stateful_rec->lflow_ref);
>       build_acls(ls_stateful_rec, od, lflows, ls_pgs, meter_groups,
> -               sampling_apps, ls_stateful_rec->lflow_ref);
> +               sampling_apps, features, ls_stateful_rec->lflow_ref);
>       build_lb_hairpin(ls_stateful_rec, od, lflows, ls_stateful_rec->lflow_ref);
>   }
>   
> @@ -16487,6 +16627,7 @@ build_lflows_thread(void *arg)
>                                               lsi->ls_port_groups,
>                                               lsi->meter_groups,
>                                               lsi->sampling_apps,
> +                                            lsi->features,
>                                               lsi->lflows);
>                   }
>               }
> @@ -16710,6 +16851,7 @@ build_lswitch_and_lrouter_flows(
>               build_ls_stateful_flows(ls_stateful_rec, od, lsi.ls_port_groups,
>                                       lsi.meter_groups,
>                                       lsi.sampling_apps,
> +                                    lsi.features,
>                                       lsi.lflows);
>           }
>           stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
> @@ -17225,6 +17367,7 @@ lflow_handle_ls_stateful_changes(struct ovsdb_idl_txn *ovnsb_txn,
>                                   lflow_input->ls_port_groups,
>                                   lflow_input->meter_groups,
>                                   lflow_input->sampling_apps,
> +                                lflow_input->features,
>                                   lflows);
>   
>           /* Sync the new flows to SB. */
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 6cc372b8a4..0adc7ee658 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -4609,7 +4609,7 @@ check_stateful_flows() {
>       AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>       AT_CHECK_UNQUOTED([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
> @@ -4633,7 +4633,7 @@ check_stateful_flows() {
>       AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   }
>   
> @@ -4676,7 +4676,7 @@ AT_CHECK([grep "ls_in_lb " sw0flows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
> @@ -4697,7 +4697,7 @@ AT_CHECK([grep "ls_out_pre_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # LB with event=false and reject=false
> @@ -4726,23 +4726,23 @@ ovn-sbctl dump-flows sw0 > sw0flows
>   AT_CAPTURE_FILE([sw0flows])
>   
>   AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
>   ])
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
>   ])
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # Add new ACL without label
> @@ -4753,27 +4753,27 @@ ovn-sbctl dump-flows sw0 > sw0flows
>   AT_CAPTURE_FILE([sw0flows])
>   
>   AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
>     table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
>     table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
>   ])
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
>     table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
>     table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
>   ])
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # Delete new ACL with label
> @@ -4790,7 +4790,7 @@ AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0]
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> @@ -4800,7 +4800,7 @@ AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   AT_CLEANUP
>   ])
> @@ -4828,7 +4828,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls from-lport 1
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AS_BOX([from-lport --apply-after-lb allow-related ACL])
> @@ -4836,7 +4836,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --apply-after-lb --label=1234 acl-add
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AS_BOX([to-lport allow-related ACL])
> @@ -4844,7 +4844,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls to-lport 1 ip
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_out_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AT_CLEANUP
> @@ -7680,7 +7680,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AS_BOX([Remove and add the ACLs back with the apply-after-lb option])
> @@ -7735,7 +7735,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AS_BOX([Remove and add the ACLs back with a few ACLs with apply-after-lb option])
> @@ -7790,7 +7790,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CLEANUP
> @@ -12608,8 +12608,8 @@ ovn-nbctl --wait=sb \
>     --id=@sample2 create Sample collector="$collector1 $collector2" metadata=4302 -- \
>     --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> -  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
>     table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>     table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> @@ -12620,7 +12620,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e l
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       reg9 = 4302;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
>       sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
> @@ -12640,8 +12640,8 @@ ovn-nbctl --wait=sb \
>     --id=@sample1 create Sample collector="$collector1 $collector2" metadata=4301 -- \
>     --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> -  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
>     table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>     table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> @@ -12650,7 +12650,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e l
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       reg9 = 0;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
>       sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
> @@ -12670,8 +12670,8 @@ ovn-nbctl --wait=sb \
>     --id=@sample2 create Sample collector="$collector1 $collector2" metadata=4302 -- \
>     --apply-after-lb --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> -  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> -  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
>     table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>     table=??(ls_in_acl_after_lb_sample), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> @@ -12682,7 +12682,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       reg9 = 4302;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
>       sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
> @@ -12702,8 +12702,8 @@ ovn-nbctl --wait=sb \
>     --id=@sample1 create Sample collector="$collector1 $collector2" metadata=4301 -- \
>     --apply-after-lb --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> -  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> -  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
>     table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>     table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> @@ -12712,7 +12712,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       reg9 = 0;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
>       sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
> @@ -12734,8 +12734,8 @@ ovn-nbctl --wait=sb \
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
>     table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> -  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> -  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
>     table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>     table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> @@ -12744,7 +12744,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       ct_commit { ct_mark.blocked = 0; };
>       reg9 = 4302;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
> @@ -12766,8 +12766,8 @@ ovn-nbctl --wait=sb \
>     --sample-new=@sample1 acl-add ls to-lport 1 "1" allow-related
>   AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
>     table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> -  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> -  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
>     table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
>   ])
> @@ -12775,7 +12775,7 @@ AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e
>   dnl Trace new connections.
>   flow="$base_flow"
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
>       ct_commit { ct_mark.blocked = 0; };
>       reg9 = 0;
>       sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
> @@ -12792,6 +12792,276 @@ AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0],
>   AT_CLEANUP
>   ])
>   
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([ACL Sampling - Generic sample])
> +AT_KEYWORDS([acl])
> +
> +ovn_start
> +
> +collector1=$(ovn-nbctl create Sample_Collector id=1 name=c1 probability=65535 set_id=100)
> +check_row_count nb:Sample_Collector 1
> +
> +ovn-nbctl create Sampling_App type="acl-new" id="42"
> +ovn-nbctl create Sampling_App type="acl-est" id="43"
> +check_row_count nb:Sampling_App 2
> +
> +check ovn-nbctl                               \
> +  -- ls-add ls                                \
> +  -- lsp-add ls lsp1                          \
> +  -- lsp-set-addresses lsp1 00:00:00:00:00:01 \
> +  -- lsp-add ls lsp2                          \
> +  -- lsp-set-addresses lsp2 00:00:00:00:00:02
> +check ovn-nbctl --wait=sb sync
> +
> +base_flow="inport == \"lsp1\" && eth.src == 00:00:00:00:00:01 && eth.dst == 00:00:00:00:00:02 && ip4.src == 42.42.42.1 && ip4.dst == 42.42.42.2"
> +m4_define([TRACE_FILTER], [grep -e sample -e commit -e reg9 -e 'reg8\[[0..7\]]' -e 'reg8\[[8..15\]]' | grep -v _sample | sort])
> +
> +AS_BOX([ACL sampling without register support])
> +check ovn-sbctl chassis-add gw1 geneve 127.0.0.1 \
> +  -- set chassis gw1 other_config:ovn-sample-with-registers="false"
> +
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
> +  --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
> +  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_point_id == 4302"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=4302);
> +])
> +
> +check ovn-sbctl set chassis gw1 other_config:ovn-sample-with-registers="true"
> +
> +AS_BOX([from-lport ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
> +  --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 1"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
> +])
> +
> +AS_BOX([from-lport ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
> +  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +])
> +
> +AS_BOX([from-lport-after-lb ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
> +  --apply-after-lb --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 1 && ct_mark.obs_collector_id == 1"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
> +])
> +
> +AS_BOX([from-lport-after-lb ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --apply-after-lb --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +])
> +
> +AS_BOX([to-lport ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
> +  --sample-new=@sample1 --sample-est=@sample2 acl-add ls to-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 2), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 2 && ct_mark.obs_collector_id == 1"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 1;
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
> +])
> +
> +AS_BOX([to-lport ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +ovn-nbctl --wait=sb                                                    \
> +  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
> +  --sample-new=@sample1 acl-add ls to-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
> +  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; };
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 2 && ct_mark.obs_collector_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
> +    reg8[[0..7]] = 1;
> +    reg8[[8..15]] = 0;
> +    reg9 = 0;
> +])
> +
> +AT_CLEANUP
> +])
> +
>   OVN_FOR_EACH_NORTHD_NO_HV([
>   AT_SETUP([ACL Sampling - same collector set id, multiple probabilities])
>   AT_KEYWORDS([acl])
> @@ -12831,24 +13101,22 @@ check_row_count nb:Sample 6
>   check ovn-nbctl --wait=sb sync
>   
>   AT_CHECK([ovn-sbctl lflow-list | grep probability | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4303), dnl
> -action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4303); next;)
> -  table=??(ls_in_acl_after_lb_sample), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4304 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4304); next;)
> -  table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), dnl
> -action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4301); next;)
> -  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4302); next;)
> -  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4306 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4306); next;)
> -  table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4305), dnl
> -action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4305); next;)
> -  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4306 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4306); next;)
> -  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4302); next;)
> -  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4304 && ct_label.obs_unused == 0), dnl
> -action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4304); next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), dnl
> +action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 1), dnl
> +action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), dnl
> +action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 0), dnl
> +action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 2), dnl
> +action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), dnl
> +action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 2), dnl
> +action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
> +  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 2), dnl
> +action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
>   ])
>   
>   AT_CLEANUP
> diff --git a/tests/ovn.at b/tests/ovn.at
> index f1fc29503f..c8aedfddfc 100644
> --- a/tests/ovn.at
> +++ b/tests/ovn.at
> @@ -336,6 +336,8 @@ ct_mark.blocked = ct_mark[0]
>   ct_mark.ecmp_reply_port = ct_mark[16..31]
>   ct_mark.force_snat = ct_mark[3]
>   ct_mark.natted = ct_mark[1]
> +ct_mark.obs_collector_id = ct_mark[16..23]
> +ct_mark.obs_stage = ct_mark[4..5]
>   ct_mark.skip_snat = ct_mark[2]
>   ct_state = NXM_NX_CT_STATE
>   ]])
> diff --git a/tests/system-ovn.at b/tests/system-ovn.at
> index ef9652f02a..853004f93a 100644
> --- a/tests/system-ovn.at
> +++ b/tests/system-ovn.at
> @@ -7724,7 +7724,7 @@ NS_CHECK_EXEC([sw0-p3], [ping -q -c 10 -i 0.3 -w 15 10.0.0.2 | FORMAT_PING], \
>   AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.2) | \
>   sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
>   sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/'], [0], [dnl
> -icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
> +icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,mark=32,labels=0x4d3000000000000000000000000
>   icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>
>   ])
>   
> @@ -7851,7 +7851,7 @@ NS_CHECK_EXEC([sw0-p1], [ping -q -c 10 -i 0.3 -w 15 10.0.0.4 | FORMAT_PING], \
>   AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.4) | \
>   sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
>   sed -e 's/labels=0x4d2[[0-9a-f]]*/labels=0x4d2000000000000000000000000/'], [0], [dnl
> -icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d2000000000000000000000000
> +icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d2000000000000000000000000
>   icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
>   ])
>   
> @@ -7866,7 +7866,7 @@ NS_CHECK_EXEC([sw0-p3], [ping -q -c 10 -i 0.3 -w 15 10.0.0.2 | FORMAT_PING], \
>   AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.2) | \
>   sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
>   sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/'], [0], [dnl
> -icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
> +icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,mark=32,labels=0x4d3000000000000000000000000
>   icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>
>   ])
>   
> @@ -8081,7 +8081,7 @@ AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.3) | \
>   sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
>   sed -e 's/labels=0x4d2[[0-9a-f]]*/labels=0x4d2000000000000000000000000/' | sort], [0], [dnl
>   icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
> -icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d2000000000000000000000000
> +icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d2000000000000000000000000
>   ])
>   
>   # Add a higher priority ACL with different label.
> @@ -8097,7 +8097,7 @@ AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.3) | \
>   sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
>   sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/' | sort], [0], [dnl
>   icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
> -icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
> +icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d3000000000000000000000000
>   ])
>   
>   OVS_APP_EXIT_AND_WAIT([ovn-controller])
diff mbox series

Patch

diff --git a/include/ovn/logical-fields.h b/include/ovn/logical-fields.h
index ce79b501cf..d6c4a9b6b3 100644
--- a/include/ovn/logical-fields.h
+++ b/include/ovn/logical-fields.h
@@ -197,6 +197,8 @@  const struct ovn_field *ovn_field_from_name(const char *name);
 #define OVN_CT_NATTED_BIT  1
 #define OVN_CT_LB_SKIP_SNAT_BIT 2
 #define OVN_CT_LB_FORCE_SNAT_BIT 3
+#define OVN_CT_OBS_STAGE_1ST_BIT 4
+#define OVN_CT_OBS_STAGE_END_BIT 5
 
 #define OVN_CT_BLOCKED 1
 #define OVN_CT_NATTED  2
diff --git a/lib/logical-fields.c b/lib/logical-fields.c
index 0c187e1c84..134d2674fd 100644
--- a/lib/logical-fields.c
+++ b/lib/logical-fields.c
@@ -165,6 +165,14 @@  ovn_init_symtab(struct shash *symtab)
                                     OVN_CT_STR(OVN_CT_LB_FORCE_SNAT_BIT)
                                     "]",
                                     WR_CT_COMMIT);
+    expr_symtab_add_subfield_scoped(symtab, "ct_mark.obs_stage", NULL,
+                                    "ct_mark["
+                                    OVN_CT_STR(OVN_CT_OBS_STAGE_1ST_BIT) ".."
+                                    OVN_CT_STR(OVN_CT_OBS_STAGE_END_BIT)
+                                    "]",
+                                    WR_CT_COMMIT);
+    expr_symtab_add_subfield_scoped(symtab, "ct_mark.obs_collector_id", NULL,
+                                    "ct_mark[16..23]", WR_CT_COMMIT);
 
     expr_symtab_add_field_scoped(symtab, "ct_label", MFF_CT_LABEL, NULL,
                                  false, WR_CT_COMMIT);
diff --git a/northd/northd.c b/northd/northd.c
index 13f9faba31..70b13c8c98 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -144,8 +144,20 @@  static bool vxlan_mode;
 #define REGBIT_ACL_VERDICT_ALLOW "reg8[16]"
 #define REGBIT_ACL_VERDICT_DROP "reg8[17]"
 #define REGBIT_ACL_VERDICT_REJECT "reg8[18]"
+#define REGBIT_ACL_OBS_STAGE "reg8[19..20]"
 #define REG_ACL_TIER "reg8[30..31]"
 
+enum acl_observation_stage {
+    ACL_OBS_FROM_LPORT          = 0,
+    ACL_OBS_FROM_LPORT_AFTER_LB = 1,
+    ACL_OBS_TO_LPORT            = 2,
+    ACL_OBS_STAGE_MAX
+};
+
+/* enum acl_observation_stage_t values must fit in the 2 bits of
+ * REGBIT_ACL_OBS_STAGE .*/
+BUILD_ASSERT_DECL(ACL_OBS_STAGE_MAX < (1 << 2));
+
 /* Indicate that this packet has been recirculated using egress
  * loopback.  This allows certain checks to be bypassed, such as a
  * logical router dropping packets with source IP address equals
@@ -189,6 +201,8 @@  static bool vxlan_mode;
  * domain and point ID. */
 #define REG_OBS_POINT_ID_NEW "reg3"
 #define REG_OBS_POINT_ID_EST "reg9"
+#define REG_OBS_COLLECTOR_ID_NEW "reg8[0..7]"
+#define REG_OBS_COLLECTOR_ID_EST "reg8[8..15]"
 
 /* Register used for temporarily store ECMP eth.src to avoid masked ct_label
  * access. It doesn't really occupy registers because the content of the
@@ -228,12 +242,13 @@  static bool vxlan_mode;
  * +----+----------------------------------------------+ G |                                   |
  * | R7 |                   UNUSED                     | 1 |                                   |
  * +----+----------------------------------------------+---+-----------------------------------+
- * |    |              LB_AFF_MATCH_PORT               |
- * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  |
- * +----+----------------------------------------------+
- * | R9 |              OBS_POINT_ID_EST                |
- * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |
- * +----+----------------------------------------------+
+ * | R8 |              LB_AFF_MATCH_PORT               | X |      REG_OBS_COLLECTOR_ID_NEW     |
+ * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  | R |      REG_OBS_COLLECTOR_ID_EST     |
+ * |    |                                              | E |  (>= ACL_EVAL* && <= ACL_ACTION*) |
+ * +----+----------------------------------------------+ G +-----------------------------------+
+ * | R9 |              OBS_POINT_ID_EST                | 4 |                                   |
+ * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |   |                                   |
+ * +----+----------------------------------------------+---+-----------------------------------+
  *
  * Logical Router pipeline:
  * +-----+---------------------------+---+-----------------+---+------------------------------------+
@@ -6532,7 +6547,8 @@  build_acl_sample_action(struct ds *actions, const struct nbrec_acl *acl,
 static void
 build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
                               const struct nbrec_sample *sample_new,
-                              const struct nbrec_sample *sample_est)
+                              const struct nbrec_sample *sample_est,
+                              enum acl_observation_stage obs_stage)
 {
     if (!acl->label && !sample_new && !sample_est) {
         return;
@@ -6540,6 +6556,8 @@  build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
 
     uint32_t point_id_new = 0;
     uint32_t point_id_est = 0;
+    uint8_t collector_id_new = 0;
+    uint8_t collector_id_est = 0;
 
     if (acl->label) {
         point_id_new = acl->label;
@@ -6547,16 +6565,27 @@  build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
     } else {
         if (sample_new) {
             point_id_new = sample_new->metadata;
+            if (sample_new->n_collectors == 1) {
+                collector_id_new = sample_new->collectors[0]->id;
+            }
         }
         if (sample_est) {
             point_id_est = sample_est->metadata;
+            if (sample_est->n_collectors == 1) {
+                collector_id_est = sample_est->collectors[0]->id;
+            }
         }
     }
 
     ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
                            REG_OBS_POINT_ID_NEW " = %"PRIu32"; "
-                           REG_OBS_POINT_ID_EST " = %"PRIu32"; ",
-                  point_id_new, point_id_est);
+                           REG_OBS_POINT_ID_EST " = %"PRIu32"; "
+                           REG_OBS_COLLECTOR_ID_NEW " = %"PRIu8"; "
+                           REG_OBS_COLLECTOR_ID_EST " = %"PRIu8"; "
+                           REGBIT_ACL_OBS_STAGE " = %"PRIu8"; ",
+                  point_id_new, point_id_est,
+                  collector_id_new, collector_id_est,
+                  (uint8_t) obs_stage);
 }
 
 /* This builds an ACL logical flow specific match that selects traffic
@@ -6604,46 +6633,16 @@  build_acl_sample_label_match(struct ds *match, const struct nbrec_acl *acl,
 }
 
 /* This builds a logical flow that samples and forwards/drops traffic
- * that hit a stateless ACL ("pass" or "allow-stateless") that has sampling
- * enabled.
+ * that hit a stateless/stateful ACL that has sampling enabled.
  */
 static void
-build_acl_sample_new_stateless_flows(const struct ovn_datapath *od,
-                                     struct lflow_table *lflows,
-                                     enum ovn_stage stage,
-                                     struct ds *match, struct ds *actions,
-                                     const struct nbrec_acl *acl,
-                                     uint8_t sample_domain_id,
-                                     struct lflow_ref *lflow_ref)
-{
-    if (!acl->sample_new) {
-        return;
-    }
-
-    ds_clear(actions);
-    ds_clear(match);
-
-    ds_put_cstr(match, "ip && ");
-    build_acl_sample_register_match(match, acl, acl->sample_new);
-
-    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
-
-    ovn_lflow_add(lflows, od, stage, 1100, ds_cstr(match),
-                  ds_cstr(actions), lflow_ref);
-}
-
-/* This builds a logical flow that samples and forwards/drops traffic
- * that created a new conntrack entry and hit a stateful ACL that has sampling
- * enabled.
- */
-static void
-build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
-                                    struct lflow_table *lflows,
-                                    enum ovn_stage stage,
-                                    struct ds *match, struct ds *actions,
-                                    const struct nbrec_acl *acl,
-                                    uint8_t sample_domain_id,
-                                    struct lflow_ref *lflow_ref)
+build_acl_sample_new_flows(const struct ovn_datapath *od,
+                           struct lflow_table *lflows,
+                           enum ovn_stage stage,
+                           struct ds *match, struct ds *actions,
+                           const struct nbrec_acl *acl,
+                           uint8_t sample_domain_id, bool stateful,
+                           struct lflow_ref *lflow_ref)
 {
     if (!acl->sample_new) {
         return;
@@ -6652,12 +6651,16 @@  build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
     ds_clear(actions);
     ds_clear(match);
 
-    /* Match on new connections.  However, for to-lport ACLs, due to
+    /* Match on new connections.  However, for stateful to-lport ACLs, due to
      * skip_port_from_conntrack() conntrack state might be cleared, so
      * take that into account too. */
-    ds_put_format(match, "ip && %s && ",
-                  stage != S_SWITCH_OUT_ACL_SAMPLE
-                  ? "ct.new" : "(ct.new || !ct.trk)");
+    if (!stateful) {
+        ds_put_format(match, "ip && ");
+    } else if (stage != S_SWITCH_OUT_ACL_SAMPLE) {
+        ds_put_format(match, "ip && ct.new && ");
+    } else {
+        ds_put_format(match, "ip && (ct.new || !ct.trk) && ");
+    }
     build_acl_sample_register_match(match, acl, acl->sample_new);
 
     build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
@@ -6758,6 +6761,112 @@  build_acl_sample_est_stateful_flows(const struct ovn_datapath *od,
 
 static void build_acl_reject_action(struct ds *actions, bool is_ingress);
 
+/* This builds a generic logical flow that samples traffic
+ * that hit a stateless/stateful ACL that has sampling enabled with
+ * single collector and all chassis supporting the sample with match action.
+ */
+static void
+build_acl_sample_generic_new_flows(const struct ovn_datapath *od,
+                                   struct lflow_table *lflows,
+                                   enum ovn_stage stage,
+                                   enum acl_observation_stage obs_stage,
+                                   struct ds *match, struct ds *actions,
+                                   const struct nbrec_sample_collector *coll,
+                                   uint8_t sample_domain_id, bool stateful,
+                                   struct lflow_ref *lflow_ref)
+{
+    ds_clear(match);
+    ds_clear(actions);
+
+    /* Match on new connections.  However, for stateful to-lport ACLs, due to
+     * skip_port_from_conntrack() conntrack state might be cleared, so
+     * take that into account too. */
+    const char *new_conn_match = "ip";
+    if (stateful) {
+        if (stage != S_SWITCH_OUT_ACL_SAMPLE) {
+            new_conn_match = "ip && ct.new";
+        } else {
+            new_conn_match = "ip && (ct.new || !ct.trk)";
+        }
+    }
+
+    ds_put_format(match, "%s && "REG_OBS_COLLECTOR_ID_NEW" == %"PRIu8" && "
+                         REGBIT_ACL_OBS_STAGE " == %"PRIu8, new_conn_match,
+                         (uint8_t) coll->id,
+                         (uint8_t) obs_stage);
+
+    ds_put_format(actions, "sample(probability=%"PRIu16","
+                           "collector_set=%"PRIu8","
+                           "obs_domain=%"PRIu32","
+                           "obs_point="REG_OBS_POINT_ID_NEW");"
+                           " next;",
+                           (uint16_t) coll->probability,
+                           (uint8_t) coll->set_id,
+                           sample_domain_id);
+
+    ovn_lflow_add(lflows, od, stage, stateful ? 1000 : 900, ds_cstr(match),
+                  ds_cstr(actions), lflow_ref);
+}
+
+/* This builds a generic logical flow that samples established traffic
+ * that hit a stateful ACL that has sampling enabled with
+ * single collector and all chassis supporting the sample with match action.
+ */
+static void
+build_acl_sample_generic_est_flows(const struct ovn_datapath *od,
+                                   struct lflow_table *lflows,
+                                   enum ovn_stage stage,
+                                   enum acl_observation_stage obs_stage,
+                                   struct ds *match, struct ds *actions,
+                                   const struct nbrec_sample_collector *coll,
+                                   uint8_t sample_domain_id,
+                                   struct lflow_ref *lflow_ref)
+{
+    ds_clear(match);
+    ds_clear(actions);
+
+    ds_put_cstr(match, "ip && ct.trk && (ct.est || ct.rel) && "
+                       "ct_label.obs_unused == 0 && ");
+
+    size_t match_len = match->length;
+    ds_put_format(match, "!ct.rpl && ct_mark.obs_collector_id == %"PRIu8" && "
+                         "ct_mark.obs_stage == %"PRIu8,
+                         (uint8_t) coll->id,
+                         (uint8_t) obs_stage);
+
+    ds_put_format(actions, "sample(probability=%"PRIu16","
+                           "collector_set=%"PRIu8","
+                           "obs_domain=%"PRIu32","
+                           "obs_point=ct_label.obs_point_id);"
+                           " next;",
+                           (uint16_t) coll->probability,
+                           (uint8_t) coll->set_id,
+                           sample_domain_id);
+
+    ovn_lflow_add(lflows, od, stage, 1000, ds_cstr(match),
+                  ds_cstr(actions), lflow_ref);
+
+    enum ovn_stage rpl_stage = (stage == S_SWITCH_OUT_ACL_SAMPLE
+                                ? S_SWITCH_IN_ACL_SAMPLE
+                                : S_SWITCH_OUT_ACL_SAMPLE);
+
+    ds_truncate(match, match_len);
+    ds_put_format(match, "ct.rpl && ct_mark.obs_collector_id == %"PRIu8,
+                  (uint8_t) coll->id);
+
+    ovn_lflow_add(lflows, od, rpl_stage, 1000, ds_cstr(match),
+                  ds_cstr(actions), lflow_ref);
+}
+
+/* Check if the smaple has only single collector and the sample action
+ * with registers is supported. */
+static bool
+acl_use_generic_sample_flows(const struct nbrec_sample *sample,
+                             const struct chassis_features *features)
+{
+    return sample && sample->n_collectors == 1 && features->sample_with_reg;
+}
+
 /* This builds all ACL sampling related logical flows:
  * - for packets creating new connections
  * - for packets that are part of an existing connection
@@ -6769,6 +6878,7 @@  build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
                        const struct nbrec_acl *acl,
                        struct ds *match, struct ds *actions,
                        const struct sampling_app_table *sampling_apps,
+                       const struct chassis_features *features,
                        struct lflow_ref *lflow_ref)
 {
     bool should_sample_established =
@@ -6792,13 +6902,17 @@  build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
 
     bool ingress = !strcmp(acl->direction, "from-lport") ? true : false;
     enum ovn_stage stage;
+    enum acl_observation_stage obs_stage;
 
     if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
         stage = S_SWITCH_IN_ACL_AFTER_LB_SAMPLE;
+        obs_stage = ACL_OBS_FROM_LPORT_AFTER_LB;
     } else if (ingress) {
         stage = S_SWITCH_IN_ACL_SAMPLE;
+        obs_stage = ACL_OBS_FROM_LPORT;
     } else {
         stage = S_SWITCH_OUT_ACL_SAMPLE;
+        obs_stage = ACL_OBS_TO_LPORT;
     }
 
     uint8_t sample_new_domain_id = sampling_app_get_id(sampling_apps,
@@ -6806,14 +6920,28 @@  build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
     uint8_t sample_est_domain_id = sampling_app_get_id(sampling_apps,
                                                        SAMPLING_APP_ACL_EST);
 
+    if (acl_use_generic_sample_flows(acl->sample_new, features)) {
+        build_acl_sample_generic_new_flows(od, lflows, stage, obs_stage,
+                                           match, actions,
+                                           acl->sample_new->collectors[0],
+                                           sample_new_domain_id,
+                                           stateful_match, lflow_ref);
+    } else {
+        build_acl_sample_new_flows(od, lflows, stage, match, actions,
+                                   acl, sample_new_domain_id, stateful_match,
+                                   lflow_ref);
+    }
+
     if (!stateful_match) {
-        build_acl_sample_new_stateless_flows(od, lflows, stage, match, actions,
-                                             acl, sample_new_domain_id,
-                                             lflow_ref);
+        return;
+    }
+
+    if (acl_use_generic_sample_flows(acl->sample_est, features)) {
+        build_acl_sample_generic_est_flows(od, lflows, stage, obs_stage,
+                                           match, actions,
+                                           acl->sample_est->collectors[0],
+                                           sample_est_domain_id, lflow_ref);
     } else {
-        build_acl_sample_new_stateful_flows(od, lflows, stage, match, actions,
-                                            acl, sample_new_domain_id,
-                                            lflow_ref);
         build_acl_sample_est_stateful_flows(od, lflows, stage, match, actions,
                                             acl, sample_est_domain_id,
                                             lflow_ref);
@@ -6845,13 +6973,17 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
 {
     bool ingress = !strcmp(acl->direction, "from-lport") ? true :false;
     enum ovn_stage stage;
+    enum acl_observation_stage obs_stage;
 
     if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
         stage = S_SWITCH_IN_ACL_AFTER_LB_EVAL;
+        obs_stage = ACL_OBS_FROM_LPORT_AFTER_LB;
     } else if (ingress) {
         stage = S_SWITCH_IN_ACL_EVAL;
+        obs_stage = ACL_OBS_FROM_LPORT;
     } else {
         stage = S_SWITCH_OUT_ACL_EVAL;
+        obs_stage = ACL_OBS_TO_LPORT;
     }
 
     const char *verdict;
@@ -6885,7 +7017,8 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
         || !strcmp(acl->action, "allow-stateless")) {
 
         /* For stateless ACLs just sample "new" packets. */
-        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
+                                      obs_stage);
 
         ds_put_cstr(actions, "next;");
         ds_put_format(match, "(%s)", acl->match);
@@ -6924,7 +7057,7 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
 
         /* For stateful ACLs sample "new" and "established" packets. */
         build_acl_sample_label_action(actions, acl, acl->sample_new,
-                                      acl->sample_est);
+                                      acl->sample_est, obs_stage);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6948,7 +7081,7 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
 
         /* For stateful ACLs sample "new" and "established" packets. */
         build_acl_sample_label_action(actions, acl, acl->sample_new,
-                                      acl->sample_est);
+                                      acl->sample_est, obs_stage);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6968,7 +7101,8 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
         ds_truncate(actions, log_verdict_len);
 
         /* For drop ACLs just sample all packets as "new" packets. */
-        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
+                                      obs_stage);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6991,7 +7125,8 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
         ds_truncate(actions, log_verdict_len);
 
         /* For drop ACLs just sample all packets as "new" packets. */
-        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL,
+                                      obs_stage);
         ds_put_cstr(actions, "ct_commit { ct_mark.blocked = 1; }; next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -7237,6 +7372,7 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
            const struct ls_port_group_table *ls_port_groups,
            const struct shash *meter_groups,
            const struct sampling_app_table *sampling_apps,
+           const struct chassis_features *features,
            struct lflow_ref *lflow_ref)
 {
     const char *default_acl_action = default_acl_drop
@@ -7429,7 +7565,8 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
                      meter_groups, ls_stateful_rec->max_acl_tier,
                      &match, &actions, lflow_ref);
         build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
-                               &match, &actions, sampling_apps, lflow_ref);
+                               &match, &actions, sampling_apps,
+                               features, lflow_ref);
     }
 
     const struct ls_port_group *ls_pg =
@@ -7448,7 +7585,7 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
                              &match, &actions, lflow_ref);
                 build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
                                        &match, &actions, sampling_apps,
-                                       lflow_ref);
+                                       features, lflow_ref);
             }
         }
     }
@@ -8111,6 +8248,8 @@  build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
     ds_put_cstr(&actions,
                  "ct_commit { "
                     "ct_mark.blocked = 0; "
+                    "ct_mark.obs_stage = " REGBIT_ACL_OBS_STAGE "; "
+                    "ct_mark.obs_collector_id = " REG_OBS_COLLECTOR_ID_EST "; "
                     "ct_label.obs_point_id = " REG_OBS_POINT_ID_EST "; "
                   "}; next;");
     ovn_lflow_add(lflows, od, S_SWITCH_IN_STATEFUL, 100,
@@ -16161,6 +16300,7 @@  build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
                         const struct ls_port_group_table *ls_pgs,
                         const struct shash *meter_groups,
                         const struct sampling_app_table *sampling_apps,
+                        const struct chassis_features *features,
                         struct lflow_table *lflows)
 {
     build_ls_stateful_rec_pre_acls(ls_stateful_rec, od, ls_pgs, lflows,
@@ -16170,7 +16310,7 @@  build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
     build_acl_hints(ls_stateful_rec, od, lflows,
                     ls_stateful_rec->lflow_ref);
     build_acls(ls_stateful_rec, od, lflows, ls_pgs, meter_groups,
-               sampling_apps, ls_stateful_rec->lflow_ref);
+               sampling_apps, features, ls_stateful_rec->lflow_ref);
     build_lb_hairpin(ls_stateful_rec, od, lflows, ls_stateful_rec->lflow_ref);
 }
 
@@ -16487,6 +16627,7 @@  build_lflows_thread(void *arg)
                                             lsi->ls_port_groups,
                                             lsi->meter_groups,
                                             lsi->sampling_apps,
+                                            lsi->features,
                                             lsi->lflows);
                 }
             }
@@ -16710,6 +16851,7 @@  build_lswitch_and_lrouter_flows(
             build_ls_stateful_flows(ls_stateful_rec, od, lsi.ls_port_groups,
                                     lsi.meter_groups,
                                     lsi.sampling_apps,
+                                    lsi.features,
                                     lsi.lflows);
         }
         stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
@@ -17225,6 +17367,7 @@  lflow_handle_ls_stateful_changes(struct ovsdb_idl_txn *ovnsb_txn,
                                 lflow_input->ls_port_groups,
                                 lflow_input->meter_groups,
                                 lflow_input->sampling_apps,
+                                lflow_input->features,
                                 lflows);
 
         /* Sync the new flows to SB. */
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 6cc372b8a4..0adc7ee658 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -4609,7 +4609,7 @@  check_stateful_flows() {
     AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
     AT_CHECK_UNQUOTED([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
@@ -4633,7 +4633,7 @@  check_stateful_flows() {
     AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 }
 
@@ -4676,7 +4676,7 @@  AT_CHECK([grep "ls_in_lb " sw0flows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
@@ -4697,7 +4697,7 @@  AT_CHECK([grep "ls_out_pre_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # LB with event=false and reject=false
@@ -4726,23 +4726,23 @@  ovn-sbctl dump-flows sw0 > sw0flows
 AT_CAPTURE_FILE([sw0flows])
 
 AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
 ])
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
 ])
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # Add new ACL without label
@@ -4753,27 +4753,27 @@  ovn-sbctl dump-flows sw0 > sw0flows
 AT_CAPTURE_FILE([sw0flows])
 
 AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
   table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
   table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
 ])
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
   table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
   table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
 ])
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # Delete new ACL with label
@@ -4790,7 +4790,7 @@  AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0]
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
@@ -4800,7 +4800,7 @@  AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 AT_CLEANUP
 ])
@@ -4828,7 +4828,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls from-lport 1
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
 ])
 
 AS_BOX([from-lport --apply-after-lb allow-related ACL])
@@ -4836,7 +4836,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --apply-after-lb --label=1234 acl-add
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
 ])
 
 AS_BOX([to-lport allow-related ACL])
@@ -4844,7 +4844,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls to-lport 1 ip
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_out_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
 ])
 
 AT_CLEANUP
@@ -7680,7 +7680,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AS_BOX([Remove and add the ACLs back with the apply-after-lb option])
@@ -7735,7 +7735,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AS_BOX([Remove and add the ACLs back with a few ACLs with apply-after-lb option])
@@ -7790,7 +7790,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CLEANUP
@@ -12608,8 +12608,8 @@  ovn-nbctl --wait=sb \
   --id=@sample2 create Sample collector="$collector1 $collector2" metadata=4302 -- \
   --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
-  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
   table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
   table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
@@ -12620,7 +12620,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e l
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     reg9 = 4302;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
     sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
@@ -12640,8 +12640,8 @@  ovn-nbctl --wait=sb \
   --id=@sample1 create Sample collector="$collector1 $collector2" metadata=4301 -- \
   --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
-  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
   table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
   table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
@@ -12650,7 +12650,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e l
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     reg9 = 0;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
     sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
@@ -12670,8 +12670,8 @@  ovn-nbctl --wait=sb \
   --id=@sample2 create Sample collector="$collector1 $collector2" metadata=4302 -- \
   --apply-after-lb --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
-  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
-  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
   table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
   table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
   table=??(ls_in_acl_after_lb_sample), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
@@ -12682,7 +12682,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     reg9 = 4302;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
     sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
@@ -12702,8 +12702,8 @@  ovn-nbctl --wait=sb \
   --id=@sample1 create Sample collector="$collector1 $collector2" metadata=4301 -- \
   --apply-after-lb --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
-  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
-  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
   table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
   table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
   table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
@@ -12712,7 +12712,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     reg9 = 0;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
     sample(probability=65535,collector_set=200,obs_domain=42,obs_point=4301);
@@ -12734,8 +12734,8 @@  ovn-nbctl --wait=sb \
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
   table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
-  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
-  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
   table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
   table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
@@ -12744,7 +12744,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     ct_commit { ct_mark.blocked = 0; };
     reg9 = 4302;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
@@ -12766,8 +12766,8 @@  ovn-nbctl --wait=sb \
   --sample-new=@sample1 acl-add ls to-lport 1 "1" allow-related
 AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
   table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
-  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
-  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 0; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
   table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
 ])
@@ -12775,7 +12775,7 @@  AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e
 dnl Trace new connections.
 flow="$base_flow"
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
     ct_commit { ct_mark.blocked = 0; };
     reg9 = 0;
     sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
@@ -12792,6 +12792,276 @@  AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0],
 AT_CLEANUP
 ])
 
+OVN_FOR_EACH_NORTHD_NO_HV([
+AT_SETUP([ACL Sampling - Generic sample])
+AT_KEYWORDS([acl])
+
+ovn_start
+
+collector1=$(ovn-nbctl create Sample_Collector id=1 name=c1 probability=65535 set_id=100)
+check_row_count nb:Sample_Collector 1
+
+ovn-nbctl create Sampling_App type="acl-new" id="42"
+ovn-nbctl create Sampling_App type="acl-est" id="43"
+check_row_count nb:Sampling_App 2
+
+check ovn-nbctl                               \
+  -- ls-add ls                                \
+  -- lsp-add ls lsp1                          \
+  -- lsp-set-addresses lsp1 00:00:00:00:00:01 \
+  -- lsp-add ls lsp2                          \
+  -- lsp-set-addresses lsp2 00:00:00:00:00:02
+check ovn-nbctl --wait=sb sync
+
+base_flow="inport == \"lsp1\" && eth.src == 00:00:00:00:00:01 && eth.dst == 00:00:00:00:00:02 && ip4.src == 42.42.42.1 && ip4.dst == 42.42.42.2"
+m4_define([TRACE_FILTER], [grep -e sample -e commit -e reg9 -e 'reg8\[[0..7\]]' -e 'reg8\[[8..15\]]' | grep -v _sample | sort])
+
+AS_BOX([ACL sampling without register support])
+check ovn-sbctl chassis-add gw1 geneve 127.0.0.1 \
+  -- set chassis gw1 other_config:ovn-sample-with-registers="false"
+
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
+  --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); next;)
+  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_point_id == 4302"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=4302);
+])
+
+check ovn-sbctl set chassis gw1 other_config:ovn-sample-with-registers="true"
+
+AS_BOX([from-lport ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
+  --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 1"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
+])
+
+AS_BOX([from-lport ACL sampling (new)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_sample -e ls_in_acl_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 0; next;)
+  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+])
+
+AS_BOX([from-lport-after-lb ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
+  --apply-after-lb --sample-new=@sample1 --sample-est=@sample2 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 1 && ct_mark.obs_collector_id == 1"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
+])
+
+AS_BOX([from-lport-after-lb ACL sampling (new)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --apply-after-lb --sample-new=@sample1 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_sample -e ls_in_acl_after_lb_eval -e ls_out_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 1; next;)
+  table=??(ls_in_acl_after_lb_sample), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 0 && ct_mark.obs_collector_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+])
+
+AS_BOX([to-lport ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --id=@sample2 create Sample collector="$collector1" metadata=4302 -- \
+  --sample-new=@sample1 --sample-est=@sample2 acl-add ls to-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 1), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; reg8[[0..7]] = 1; reg8[[8..15]] = 1; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 1 && ct_mark.obs_stage == 2), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_point_id == 4302 && ct_mark.obs_stage == 2 && ct_mark.obs_collector_id == 1"
+AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 1;
+    reg9 = 4302;
+    sample(probability=65535,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id);
+])
+
+AS_BOX([to-lport ACL sampling (new)])
+check ovn-nbctl acl-del ls
+ovn-nbctl --wait=sb                                                    \
+  --id=@sample1 create Sample collector="$collector1" metadata=4301 -- \
+  --sample-new=@sample1 acl-add ls to-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_sample -e ls_out_acl_eval -e ls_in_acl_sample | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_sample   ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; reg8[[0..7]] = 1; reg8[[8..15]] = 0; reg8[[19..20]] = 2; next;)
+  table=??(ls_out_acl_sample  ), priority=0    , match=(1), action=(next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=reg3); next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | TRACE_FILTER], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_mark.obs_stage = reg8[[19..20]]; ct_mark.obs_collector_id = reg8[[8..15]]; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; };
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+    sample(probability=65535,collector_set=100,obs_domain=42,obs_point=reg3);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_point_id == 0 && ct_mark.obs_stage == 2 && ct_mark.obs_collector_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | TRACE_FILTER], [0], [dnl
+    reg8[[0..7]] = 1;
+    reg8[[8..15]] = 0;
+    reg9 = 0;
+])
+
+AT_CLEANUP
+])
+
 OVN_FOR_EACH_NORTHD_NO_HV([
 AT_SETUP([ACL Sampling - same collector set id, multiple probabilities])
 AT_KEYWORDS([acl])
@@ -12831,24 +13101,22 @@  check_row_count nb:Sample 6
 check ovn-nbctl --wait=sb sync
 
 AT_CHECK([ovn-sbctl lflow-list | grep probability | ovn_strip_lflows], [0], [dnl
-  table=??(ls_in_acl_after_lb_sample), priority=1100 , match=(ip && ct.new && reg3 == 4303), dnl
-action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4303); next;)
-  table=??(ls_in_acl_after_lb_sample), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4304 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4304); next;)
-  table=??(ls_in_acl_sample   ), priority=1100 , match=(ip && ct.new && reg3 == 4301), dnl
-action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4301); next;)
-  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4302); next;)
-  table=??(ls_in_acl_sample   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4306 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4306); next;)
-  table=??(ls_out_acl_sample  ), priority=1100 , match=(ip && (ct.new || !ct.trk) && reg3 == 4305), dnl
-action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=4305); next;)
-  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && ct_label.obs_point_id == 4306 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4306); next;)
-  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4302); next;)
-  table=??(ls_out_acl_sample  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && ct_label.obs_point_id == 4304 && ct_label.obs_unused == 0), dnl
-action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=4304); next;)
+  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 1), dnl
+action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_in_acl_after_lb_sample), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 1), dnl
+action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.new && reg8[[0..7]] == 1 && reg8[[19..20]] == 0), dnl
+action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 0), dnl
+action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_in_acl_sample   ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 2), dnl
+action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && (ct.new || !ct.trk) && reg8[[0..7]] == 1 && reg8[[19..20]] == 2), dnl
+action=(sample(probability=10000,collector_set=100,obs_domain=42,obs_point=reg3); next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && !ct.rpl && ct_mark.obs_collector_id == 2 && ct_mark.obs_stage == 2), dnl
+action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
+  table=??(ls_out_acl_sample  ), priority=1000 , match=(ip && ct.trk && (ct.est || ct.rel) && ct_label.obs_unused == 0 && ct.rpl && ct_mark.obs_collector_id == 2), dnl
+action=(sample(probability=20000,collector_set=100,obs_domain=43,obs_point=ct_label.obs_point_id); next;)
 ])
 
 AT_CLEANUP
diff --git a/tests/ovn.at b/tests/ovn.at
index f1fc29503f..c8aedfddfc 100644
--- a/tests/ovn.at
+++ b/tests/ovn.at
@@ -336,6 +336,8 @@  ct_mark.blocked = ct_mark[0]
 ct_mark.ecmp_reply_port = ct_mark[16..31]
 ct_mark.force_snat = ct_mark[3]
 ct_mark.natted = ct_mark[1]
+ct_mark.obs_collector_id = ct_mark[16..23]
+ct_mark.obs_stage = ct_mark[4..5]
 ct_mark.skip_snat = ct_mark[2]
 ct_state = NXM_NX_CT_STATE
 ]])
diff --git a/tests/system-ovn.at b/tests/system-ovn.at
index ef9652f02a..853004f93a 100644
--- a/tests/system-ovn.at
+++ b/tests/system-ovn.at
@@ -7724,7 +7724,7 @@  NS_CHECK_EXEC([sw0-p3], [ping -q -c 10 -i 0.3 -w 15 10.0.0.2 | FORMAT_PING], \
 AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.2) | \
 sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
 sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/'], [0], [dnl
-icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
+icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,mark=32,labels=0x4d3000000000000000000000000
 icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>
 ])
 
@@ -7851,7 +7851,7 @@  NS_CHECK_EXEC([sw0-p1], [ping -q -c 10 -i 0.3 -w 15 10.0.0.4 | FORMAT_PING], \
 AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.4) | \
 sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
 sed -e 's/labels=0x4d2[[0-9a-f]]*/labels=0x4d2000000000000000000000000/'], [0], [dnl
-icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d2000000000000000000000000
+icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d2000000000000000000000000
 icmp,orig=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=8,code=0),reply=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
 ])
 
@@ -7866,7 +7866,7 @@  NS_CHECK_EXEC([sw0-p3], [ping -q -c 10 -i 0.3 -w 15 10.0.0.2 | FORMAT_PING], \
 AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.2) | \
 sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
 sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/'], [0], [dnl
-icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
+icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>,mark=32,labels=0x4d3000000000000000000000000
 icmp,orig=(src=10.0.0.4,dst=10.0.0.2,id=<cleared>,type=8,code=0),reply=(src=10.0.0.2,dst=10.0.0.4,id=<cleared>,type=0,code=0),zone=<cleared>
 ])
 
@@ -8081,7 +8081,7 @@  AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.3) | \
 sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
 sed -e 's/labels=0x4d2[[0-9a-f]]*/labels=0x4d2000000000000000000000000/' | sort], [0], [dnl
 icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
-icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d2000000000000000000000000
+icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d2000000000000000000000000
 ])
 
 # Add a higher priority ACL with different label.
@@ -8097,7 +8097,7 @@  AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.0.0.3) | \
 sed -e 's/zone=[[0-9]]*/zone=<cleared>/' | \
 sed -e 's/labels=0x4d3[[0-9a-f]]*/labels=0x4d3000000000000000000000000/' | sort], [0], [dnl
 icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>
-icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,labels=0x4d3000000000000000000000000
+icmp,orig=(src=10.0.0.2,dst=10.0.0.3,id=<cleared>,type=8,code=0),reply=(src=10.0.0.3,dst=10.0.0.2,id=<cleared>,type=0,code=0),zone=<cleared>,mark=16,labels=0x4d3000000000000000000000000
 ])
 
 OVS_APP_EXIT_AND_WAIT([ovn-controller])