diff mbox series

[ovs-dev,v3,8/8] northd: Add ACL Sampling.

Message ID 20240712151416.992033-9-dceara@redhat.com
State Changes Requested
Delegated to: Mark Michelson
Headers show
Series Add ACL Sampling using per-flow IPFIX. | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot-_Build_and_Test fail github build: failed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Dumitru Ceara July 12, 2024, 3:14 p.m. UTC
From: Adrian Moreno <amorenoz@redhat.com>

Introduce a new table called Sample where per-flow IPFIX configuration
can be specified.
Also, reference rows from such table from the ACL table to enable the
configuration of ACL sampling. If enabled, northd will add a sample
action to each ACL related logical flow.

Packets that hit stateful ACLs are sampled in different ways depending
whether they are initiating a new session or are just forwarded on an
existing (already allowed) session.  Two new columns ("sample_new" and
"sample_est") are added to the ACL table to allow for potentially
different sampling rates for the two cases.

Note: If an ACL has both sampling enabled and a label associated to it
then the label value overrides the observation point ID defined in the
sample configuration.  This is a side effect of the implementation
(observation point IDs are stored in conntrack in the same part of the
ct_label where ACL labels are also stored).  The two features
(sampling and ACL labels) serve however similar purposes so it's not
expected that they're both enabled together.

When sampling is enabled on an ACL additional logical flows are created
for that ACL (one for stateless ACLs and 3 for stateful ACLs) in the ACL
action stage of the logical pipeline.  These additional flows match on a
combination of conntrack state values and observation point id values
(either against a logical register or against the stored ct_label state)
in order to determine whether the packets hitting the ACLs must be
sampled or not.  This comes with a slight increase in the number of
logical flows and in the number of OpenFlow rules.  The number of
additional flows _does not_ depend on the original ACL match or action.

New --sample-new and --sample-est optional arguments are added to the
'ovn-nbctl acl-add' command to allow configuring these new types of
sampling for ACLs.

An example workflow of configuring ACL samples is:
  # Create Sampling_App mappings for ACL traffic types:
  ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" \
                                id="42"
  ovn-nbctl create sampling_app name="acl-est-traffic-sampling" \
			        id="43"
  # Create two sample collectors, one that samples all packets (c1)
  # and another one that samples with a probability of 10% (c2):
  c1=$(ovn-nbctl create Sample_Collector name=c1 \
       probability=65535 set_id=1)
  c2=$(ovn-nbctl create Sample_Collector name=c2 \
       probability=6553 set_id=2)
  # Create two sample configurations (for new and for established
  # traffic):
  s1=$(ovn-nbctl create sample collector="$c1 $c2" metadata=4301)
  s2=$(ovn-nbctl create sample collector="$c1 $c2" metadata=4302)
  # Create an ingress ACL to allow IP traffic:
  ovn-nbctl --sample-new=$s1 --sample-est=$s2 acl-add ls \
            from-lport 1 "ip" allow-related

The config above will generate IPFIX samples with:
- 8 MSB of observation domain id set to 42 (Sampling_App
  "acl-new-traffic-sampling" config) and observation point id
  set to 4301 (Sample s1) for packets that create a new
  connection
- 8 MSB of observation domain id set to 43 (Sampling_app
  "acl-est-traffic-sampling" config) and observation point id
  set to 4302 (Sample s2) for packets that are part of an already
  existing connection

Note: in general, all generated IPFIX sample observation domain IDs are
built by ovn-controller in the following way:
The 8 MSB taken from the sample action's obs_domain_id and the last 24
LSB taken from the Southbound logical datapath tunnel_key (datapath ID).

Reported-at: https://issues.redhat.com/browse/FDP-305
Signed-off-by: Adrian Moreno <amorenoz@redhat.com>
Co-authored-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Dumitru Ceara <dceara@redhat.com>
---
V3:
- Addressed Ilya's comment:
  - Bumped NB schema version.
V2:
- Addressed Adrian's comments:
  - fixed up observation domain id comment in commit log.
  - store the obs_domain_id in the ct_label as an 8 bit value (add a
    test).
  - removed redundant check in build_acl_sample_label_action().
  - added missing space after ternary ":" operator.
  - documented limitation for sampling ACLs with action "pass".
  - documented sample_new behavior for stateless ACLs.
- Removed unused OVN_CT_SAMPLE_ID_SET_BIT and OVN_CT_SAMPLE_ID_SET.
---
 NEWS                                   |   3 +
 lib/logical-fields.c                   |   6 +
 northd/northd.c                        | 519 +++++++++++++++++++++++--
 northd/ovn-northd.8.xml                |  26 ++
 ovn-nb.ovsschema                       |  46 ++-
 ovn-nb.xml                             |  63 +++
 tests/atlocal.in                       |   6 +
 tests/ovn-macros.at                    |   4 +
 tests/ovn-nbctl.at                     |  20 +
 tests/ovn-northd.at                    | 240 ++++++++++--
 tests/ovn.at                           |   9 +
 tests/system-common-macros.at          |  11 +
 tests/system-ovn.at                    | 149 +++++++
 utilities/containers/fedora/Dockerfile |   1 +
 utilities/containers/ubuntu/Dockerfile |   1 +
 utilities/ovn-nbctl.8.xml              |   8 +-
 utilities/ovn-nbctl.c                  |  43 +-
 17 files changed, 1091 insertions(+), 64 deletions(-)

Comments

Mark Michelson July 30, 2024, 9:29 p.m. UTC | #1
Based on discussions I've had with devs, this change can result in quite 
a lot of flows being installed. There is an issue [1] that seeks to 
optimize this, but unfortunately this optimization revealed some other 
issues, including in the kernel. Therefore, the optimization is not 
viable to get in for ovn24.09.

Since ACL sampling is opt-in, I'm of the opinion that this feature is 
acceptable to get merged, even if the suggested optimization is not 
acceptable. Opting in comes with the caveat that you may see quite a 
large number of flows installed per as a result. Of course, since 
sampling is configured per-ACL, it's also possible to only opt in on 
certain ACLs instead of all of them, thus not resulting in quite the 
same flow explosion. It may be worth noting in the ovn-nb.xml 
documentation this particular risk.

I had some potential optimization-related suggestions initially as part 
of this review, but since optimization is something that is being 
delayed anyway, I think this can be merged as-is, with one suggestion 
from me in the ovn-nbctl change. See below.

Acked-by: Mark Michelson <mmichels@redhat.com>

[1] https://issues.redhat.com/browse/FDP-709

On 7/12/24 11:14, Dumitru Ceara wrote:
> From: Adrian Moreno <amorenoz@redhat.com>
> 
> Introduce a new table called Sample where per-flow IPFIX configuration
> can be specified.
> Also, reference rows from such table from the ACL table to enable the
> configuration of ACL sampling. If enabled, northd will add a sample
> action to each ACL related logical flow.
> 
> Packets that hit stateful ACLs are sampled in different ways depending
> whether they are initiating a new session or are just forwarded on an
> existing (already allowed) session.  Two new columns ("sample_new" and
> "sample_est") are added to the ACL table to allow for potentially
> different sampling rates for the two cases.
> 
> Note: If an ACL has both sampling enabled and a label associated to it
> then the label value overrides the observation point ID defined in the
> sample configuration.  This is a side effect of the implementation
> (observation point IDs are stored in conntrack in the same part of the
> ct_label where ACL labels are also stored).  The two features
> (sampling and ACL labels) serve however similar purposes so it's not
> expected that they're both enabled together.
> 
> When sampling is enabled on an ACL additional logical flows are created
> for that ACL (one for stateless ACLs and 3 for stateful ACLs) in the ACL
> action stage of the logical pipeline.  These additional flows match on a
> combination of conntrack state values and observation point id values
> (either against a logical register or against the stored ct_label state)
> in order to determine whether the packets hitting the ACLs must be
> sampled or not.  This comes with a slight increase in the number of
> logical flows and in the number of OpenFlow rules.  The number of
> additional flows _does not_ depend on the original ACL match or action.
> 
> New --sample-new and --sample-est optional arguments are added to the
> 'ovn-nbctl acl-add' command to allow configuring these new types of
> sampling for ACLs.
> 
> An example workflow of configuring ACL samples is:
>    # Create Sampling_App mappings for ACL traffic types:
>    ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" \
>                                  id="42"
>    ovn-nbctl create sampling_app name="acl-est-traffic-sampling" \
> 			        id="43"
>    # Create two sample collectors, one that samples all packets (c1)
>    # and another one that samples with a probability of 10% (c2):
>    c1=$(ovn-nbctl create Sample_Collector name=c1 \
>         probability=65535 set_id=1)
>    c2=$(ovn-nbctl create Sample_Collector name=c2 \
>         probability=6553 set_id=2)
>    # Create two sample configurations (for new and for established
>    # traffic):
>    s1=$(ovn-nbctl create sample collector="$c1 $c2" metadata=4301)
>    s2=$(ovn-nbctl create sample collector="$c1 $c2" metadata=4302)
>    # Create an ingress ACL to allow IP traffic:
>    ovn-nbctl --sample-new=$s1 --sample-est=$s2 acl-add ls \
>              from-lport 1 "ip" allow-related
> 
> The config above will generate IPFIX samples with:
> - 8 MSB of observation domain id set to 42 (Sampling_App
>    "acl-new-traffic-sampling" config) and observation point id
>    set to 4301 (Sample s1) for packets that create a new
>    connection
> - 8 MSB of observation domain id set to 43 (Sampling_app
>    "acl-est-traffic-sampling" config) and observation point id
>    set to 4302 (Sample s2) for packets that are part of an already
>    existing connection
> 
> Note: in general, all generated IPFIX sample observation domain IDs are
> built by ovn-controller in the following way:
> The 8 MSB taken from the sample action's obs_domain_id and the last 24
> LSB taken from the Southbound logical datapath tunnel_key (datapath ID).
> 
> Reported-at: https://issues.redhat.com/browse/FDP-305
> Signed-off-by: Adrian Moreno <amorenoz@redhat.com>
> Co-authored-by: Dumitru Ceara <dceara@redhat.com>
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> ---
> V3:
> - Addressed Ilya's comment:
>    - Bumped NB schema version.
> V2:
> - Addressed Adrian's comments:
>    - fixed up observation domain id comment in commit log.
>    - store the obs_domain_id in the ct_label as an 8 bit value (add a
>      test).
>    - removed redundant check in build_acl_sample_label_action().
>    - added missing space after ternary ":" operator.
>    - documented limitation for sampling ACLs with action "pass".
>    - documented sample_new behavior for stateless ACLs.
> - Removed unused OVN_CT_SAMPLE_ID_SET_BIT and OVN_CT_SAMPLE_ID_SET.
> ---
>   NEWS                                   |   3 +
>   lib/logical-fields.c                   |   6 +
>   northd/northd.c                        | 519 +++++++++++++++++++++++--
>   northd/ovn-northd.8.xml                |  26 ++
>   ovn-nb.ovsschema                       |  46 ++-
>   ovn-nb.xml                             |  63 +++
>   tests/atlocal.in                       |   6 +
>   tests/ovn-macros.at                    |   4 +
>   tests/ovn-nbctl.at                     |  20 +
>   tests/ovn-northd.at                    | 240 ++++++++++--
>   tests/ovn.at                           |   9 +
>   tests/system-common-macros.at          |  11 +
>   tests/system-ovn.at                    | 149 +++++++
>   utilities/containers/fedora/Dockerfile |   1 +
>   utilities/containers/ubuntu/Dockerfile |   1 +
>   utilities/ovn-nbctl.8.xml              |   8 +-
>   utilities/ovn-nbctl.c                  |  43 +-
>   17 files changed, 1091 insertions(+), 64 deletions(-)
> 
> diff --git a/NEWS b/NEWS
> index fcf182bc02..7899c623f2 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -41,6 +41,9 @@ Post v24.03.0
>     - The NB_Global.debug_drop_domain_id configured value is now overridden by
>       the ID associated with the Sampling_App record created for drop sampling
>       (Sampling_App.name configured to be "drop-sampling").
> +  - Add support for ACL sampling through the new Sample_Collector and Sample
> +    tables.  Sampling is supported for both traffic that creates new
> +    connections and for traffic that is part of an existing connection.
>   
>   OVN v24.03.0 - 01 Mar 2024
>   --------------------------
> diff --git a/lib/logical-fields.c b/lib/logical-fields.c
> index 4acf8a677e..05292eb4aa 100644
> --- a/lib/logical-fields.c
> +++ b/lib/logical-fields.c
> @@ -175,6 +175,12 @@ ovn_init_symtab(struct shash *symtab)
>                                       WR_CT_COMMIT);
>       expr_symtab_add_subfield_scoped(symtab, "ct_label.label", NULL,
>                                       "ct_label[96..127]", WR_CT_COMMIT);
> +    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_point_id", NULL,
> +                                    "ct_label[96..127]", WR_CT_COMMIT);
> +    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_domain_id", NULL,
> +                                    "ct_label[88..95]", WR_CT_COMMIT);
> +    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_unused", NULL,
> +                                    "ct_label[0..87]", WR_CT_COMMIT);
>   
>       expr_symtab_add_field(symtab, "ct_state", MFF_CT_STATE, NULL, false);
>   
> diff --git a/northd/northd.c b/northd/northd.c
> index 901b9e9cd1..80b9164460 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -50,6 +50,7 @@
>   #include "en-lr-nat.h"
>   #include "en-lr-stateful.h"
>   #include "en-ls-stateful.h"
> +#include "en-sampling-app.h"
>   #include "lib/ovn-parallel-hmap.h"
>   #include "ovn/actions.h"
>   #include "ovn/features.h"
> @@ -184,8 +185,10 @@ static bool vxlan_mode;
>   
>   #define REG_ORIG_TP_DPORT_ROUTER   "reg9[16..31]"
>   
> -/* Register used for setting a label for ACLs in a Logical Switch. */
> -#define REG_LABEL "reg3"
> +/* Registers used for pasing observability information for switches:
> + * domain and point ID. */
> +#define REG_OBS_POINT_ID_NEW "reg3"
> +#define REG_OBS_POINT_ID_EST "reg9"
>   
>   /* Register used for temporarily store ECMP eth.src to avoid masked ct_label
>    * access. It doesn't really occupy registers because the content of the
> @@ -209,13 +212,13 @@ static bool vxlan_mode;
>    * |    |     REGBIT_{HAIRPIN/HAIRPIN_REPLY}           |   |                                   |
>    * |    | REGBIT_ACL_HINT_{ALLOW_NEW/ALLOW/DROP/BLOCK} |   |                                   |
>    * |    |     REGBIT_ACL_{LABEL/STATELESS}             | X |                                   |
> - * +----+----------------------------------------------+ X |                                   |
> - * | R5 |                   UNUSED                     | X |       LB_L2_AFF_BACKEND_IP6       |
> - * | R1 |         ORIG_DIP_IPV4 (>= IN_PRE_STATEFUL)   | R |                                   |
> - * +----+----------------------------------------------+ E |                                   |
> + * +----+----------------------------------------------+ X |       LB_L2_AFF_BACKEND_IP6       |
> + * | R1 |         ORIG_DIP_IPV4 (>= IN_PRE_STATEFUL)   | R |        (>= IN_LB_AFF_CHECK &&     |
> + * +----+----------------------------------------------+ E |         <= IN_LB_AFF_LEARN)       |
>    * | R2 |         ORIG_TP_DPORT (>= IN_PRE_STATEFUL)   | G |                                   |
>    * +----+----------------------------------------------+ 0 |                                   |
> - * | R3 |                  ACL LABEL                   |   |                                   |
> + * | R3 |             OBS_POINT_ID_NEW                 |   |                                   |
> + * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |   |                                   |
>    * +----+----------------------------------------------+---+-----------------------------------+
>    * | R4 |            REG_LB_AFF_BACKEND_IP4            |   |                                   |
>    * +----+----------------------------------------------+ X |                                   |
> @@ -225,9 +228,11 @@ static bool vxlan_mode;
>    * +----+----------------------------------------------+ G |                                   |
>    * | R7 |                   UNUSED                     | 1 |                                   |
>    * +----+----------------------------------------------+---+-----------------------------------+
> - * | R8 |              LB_AFF_MATCH_PORT               |
> + * |    |              LB_AFF_MATCH_PORT               |
> + * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  |
>    * +----+----------------------------------------------+
> - * | R9 |                   UNUSED                     |
> + * | R9 |              OBS_POINT_ID_EST                |
> + * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |
>    * +----+----------------------------------------------+
>    *
>    * Logical Router pipeline:
> @@ -6461,6 +6466,409 @@ build_acl_log(struct ds *actions, const struct nbrec_acl *acl,
>       ds_put_cstr(actions, "); ");
>   }
>   
> +/* This builds an ACL specific sample action.
> + * If the ACL has a label configured the label itself is used as sample
> + * observation point ID.  Otherwise the configured 'sample->metadata'
> + * is passed as observation point ID. */
> +static void
> +build_acl_sample_action(struct ds *actions, const struct nbrec_acl *acl,
> +                        const struct nbrec_sample *sample,
> +                        uint8_t sample_domain_id)
> +{
> +    if (!sample || sample_domain_id == SAMPLING_APP_ID_NONE) {
> +        return;
> +    }
> +
> +    uint32_t domain_id = 0;
> +    uint32_t point_id = 0;
> +
> +    if (acl->label) {
> +        domain_id = 0;
> +        point_id = acl->label;
> +    } else if (sample) {
> +        domain_id = sample_domain_id;
> +        point_id = sample->metadata;
> +    }
> +
> +    for (size_t i = 0; i < sample->n_collectors; i++) {
> +        ds_put_format(actions, "sample(probability=%"PRIu16","
> +                               "collector_set=%hd,"
> +                               "obs_domain=%"PRIu32","
> +                               "obs_point=%"PRIu32");",
> +                               (uint16_t) sample->collectors[i]->probability,
> +                               (uint32_t) sample->collectors[i]->set_id,
> +                               domain_id, point_id);
> +    }
> +}
> +
> +/* This builds an ACL logical flow specific action that stores the observation
> + * point IDs to be used for samples generated for traffic that hits the ACL.
> + * Two observation point IDs are stored in registers, the one for traffic
> + * that creates new connections and the one for traffic that's part of an
> + * existing connection.
> + */
> +static void
> +build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
> +                              const struct nbrec_sample *sample_new,
> +                              const struct nbrec_sample *sample_est)
> +{
> +    if (!acl->label && !sample_new && !sample_est) {
> +        return;
> +    }
> +
> +    uint32_t point_id_new = 0;
> +    uint32_t point_id_est = 0;
> +
> +    if (acl->label) {
> +        point_id_new = acl->label;
> +        point_id_est = acl->label;
> +    } else {
> +        if (sample_new) {
> +            point_id_new = sample_new->metadata;
> +        }
> +        if (sample_est) {
> +            point_id_est = sample_est->metadata;
> +        }
> +    }
> +
> +    ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
> +                           REG_OBS_POINT_ID_NEW " = %"PRIu32"; "
> +                           REG_OBS_POINT_ID_EST " = %"PRIu32"; ",
> +                  point_id_new, point_id_est);
> +}
> +
> +/* This builds an ACL logical flow specific match that selects traffic
> + * with an associated observation point ID register equal to that of the
> + * ACL label (if configured) or sample->metadata.
> + */
> +static void
> +build_acl_sample_register_match(struct ds *match, const struct nbrec_acl *acl,
> +                                const struct nbrec_sample *sample)
> +{
> +    uint32_t point_id = 0;
> +
> +    if (acl->label) {
> +        point_id = acl->label;
> +    } else if (sample) {
> +        point_id = sample->metadata;
> +    }
> +
> +    ds_put_format(match, REG_OBS_POINT_ID_NEW " == %"PRIu32, point_id);
> +}
> +
> +/* This builds an ACL logical flow specific match that selects conntracked
> + * traffic whose associated ct_label.obs_point ID is equal to that of the
> + * ACL label (if configured) or sample->metadata.  The match also ensures
> + * that the observation domain ID stored in the ct_label is also equal to
> + * 'sample_domain_id'.
> + */
> +static void
> +build_acl_sample_label_match(struct ds *match, const struct nbrec_acl *acl,
> +                             const struct nbrec_sample *sample,
> +                             uint8_t sample_domain_id)
> +{
> +    uint32_t domain_id = 0;
> +    uint32_t point_id = 0;
> +
> +    if (acl->label) {
> +        domain_id = 0;
> +        point_id = acl->label;
> +    } else if (sample) {
> +        domain_id = sample_domain_id;
> +        point_id = sample->metadata;
> +    }
> +
> +    /* Match on the complete ct_label to avoid masked access to it in the
> +     * datapath.  Some NICs do not support HW offloading when masked-access
> +     * of ct_label is used in the datapath. */
> +    ds_put_format(match, "ct_label.obs_domain_id == %"PRIu32" && "
> +                         "ct_label.obs_point_id == %"PRIu32" && "
> +                         "ct_label.obs_unused == 0",
> +                  domain_id, point_id);
> +}
> +
> +/* This builds a logical flow that samples and forwards/drops traffic
> + * that hit a stateless ACL ("pass" or "allow-stateless") that has sampling
> + * enabled.
> + */
> +static void
> +build_acl_sample_new_stateless_flows(const struct ovn_datapath *od,
> +                                     struct lflow_table *lflows,
> +                                     enum ovn_stage stage,
> +                                     struct ds *match, struct ds *actions,
> +                                     const struct nbrec_acl *acl,
> +                                     const char *reg_verdict,
> +                                     const char *verdict_actions,
> +                                     const char *copp_meter,
> +                                     uint8_t sample_domain_id,
> +                                     struct lflow_ref *lflow_ref)
> +{
> +    if (!acl->sample_new) {
> +        return;
> +    }
> +
> +    ds_clear(actions);
> +    ds_clear(match);
> +
> +    ds_put_format(match, "ip && %s == 1 && ", reg_verdict);
> +    build_acl_sample_register_match(match, acl, acl->sample_new);
> +
> +    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
> +    ds_put_format(actions, " %s", verdict_actions);
> +
> +    ovn_lflow_metered(lflows, od, stage, 1100,
> +                      ds_cstr(match), ds_cstr(actions),
> +                      copp_meter, lflow_ref);
> +}
> +
> +/* This builds a logical flow that samples and forwards/drops traffic
> + * that created a new conntrack entry and hit a stateful ACL that has sampling
> + * enabled.
> + */
> +static void
> +build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
> +                                    struct lflow_table *lflows,
> +                                    enum ovn_stage stage,
> +                                    struct ds *match, struct ds *actions,
> +                                    const struct nbrec_acl *acl,
> +                                    const char *reg_verdict,
> +                                    const char *verdict_actions,
> +                                    const char *copp_meter,
> +                                    uint8_t sample_domain_id,
> +                                    struct lflow_ref *lflow_ref)
> +{
> +    if (!acl->sample_new) {
> +        return;
> +    }
> +
> +    ds_clear(actions);
> +    ds_clear(match);
> +
> +    ds_put_format(match, "ip && ct.new && %s == 1 && ", reg_verdict);
> +    build_acl_sample_register_match(match, acl, acl->sample_new);
> +
> +    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
> +    ds_put_format(actions, " %s", verdict_actions);
> +
> +    ovn_lflow_metered(lflows, od, stage, 1100,
> +                      ds_cstr(match), ds_cstr(actions),
> +                      copp_meter, lflow_ref);
> +}
> +
> +/* This builds a logical flow that samples and forwards traffic
> + * that is part of an existing connection (in the original direction) created
> + * by traffic allowed by a stateful ACL that has sampling enabled.
> + */
> +static void
> +build_acl_sample_est_orig_stateful_flows(const struct ovn_datapath *od,
> +                                         struct lflow_table *lflows,
> +                                         enum ovn_stage stage,
> +                                         struct ds *match, struct ds *actions,
> +                                         const struct nbrec_acl *acl,
> +                                         const char *reg_verdict,
> +                                         const char *verdict_actions,
> +                                         const char *copp_meter,
> +                                         uint8_t sample_domain_id,
> +                                         struct lflow_ref *lflow_ref)
> +{
> +    ds_clear(actions);
> +    ds_clear(match);
> +
> +    ds_put_format(match, "ip && ct.trk && "
> +                         "(ct.est || ct.rel) && "
> +                         "!ct.rpl && %s == 1 && ",
> +                  reg_verdict);
> +    build_acl_sample_label_match(match, acl, acl->sample_est,
> +                                 sample_domain_id);
> +
> +    build_acl_sample_action(actions, acl, acl->sample_est, sample_domain_id);
> +    ds_put_format(actions, " %s", verdict_actions);
> +
> +    ovn_lflow_metered(lflows, od, stage, 1200,
> +                      ds_cstr(match), ds_cstr(actions),
> +                      copp_meter, lflow_ref);
> +}
> +
> +/* This builds a logical flow that samples and forwards traffic
> + * that is part of an existing connection (in the reply direction) created
> + * by traffic allowed by a stateful ACL that has sampling enabled.
> + *
> + * NOTE: unlike for traffic in the original direction, this logical flow must
> + * be installed in the "opposite" pipeline.  That is, for "from-lport" ACLs
> + * the conntrack entry is created in the ingress logical port zone and will be
> + * hit by reply traffic in the egress pipeline (before being sent out that
> + * logical port).
> + */
> +static void
> +build_acl_sample_est_rpl_stateful_flows(const struct ovn_datapath *od,
> +                                        struct lflow_table *lflows,
> +                                        enum ovn_stage rpl_stage,
> +                                        struct ds *match, struct ds *actions,
> +                                        const struct nbrec_acl *acl,
> +                                        const char *reg_verdict,
> +                                        const char *verdict_actions,
> +                                        const char *copp_meter,
> +                                        uint8_t sample_domain_id,
> +                                        struct lflow_ref *lflow_ref)
> +{
> +    ds_clear(actions);
> +    ds_clear(match);
> +
> +    ds_put_format(match, "ip && ct.trk && "
> +                         "(ct.est || ct.rel) && "
> +                         "ct.rpl && %s == 1 && ",
> +                  reg_verdict);
> +    build_acl_sample_label_match(match, acl, acl->sample_est,
> +                                 sample_domain_id);
> +
> +    build_acl_sample_action(actions, acl, acl->sample_est, sample_domain_id);
> +    ds_put_format(actions, " %s", verdict_actions);
> +
> +    ovn_lflow_metered(lflows, od, rpl_stage, 1200,
> +                      ds_cstr(match), ds_cstr(actions),
> +                      copp_meter, lflow_ref);
> +}
> +
> +/* This builds logical flows that sample and forward traffic
> + * that is part of an existing connection (both in the original and in the
> + * reply direction) created by traffic allowed by a stateful ACL that has
> + * sampling enabled.
> + */
> +static void
> +build_acl_sample_est_stateful_flows(const struct ovn_datapath *od,
> +                                    struct lflow_table *lflows,
> +                                    enum ovn_stage stage,
> +                                    struct ds *match, struct ds *actions,
> +                                    const struct nbrec_acl *acl,
> +                                    const char *reg_verdict,
> +                                    const char *verdict_actions,
> +                                    const char *copp_meter,
> +                                    uint8_t sample_domain_id,
> +                                    struct lflow_ref *lflow_ref)
> +{
> +    if (!acl->sample_est) {
> +        return;
> +    }
> +    build_acl_sample_est_orig_stateful_flows(od, lflows, stage, match, actions,
> +                                             acl, reg_verdict, verdict_actions,
> +                                             copp_meter, sample_domain_id,
> +                                             lflow_ref);
> +
> +    /* Install flows in the "opposite" pipeline direction to handle reply
> +     * traffic on established connections. */
> +    enum ovn_stage rpl_stage = (stage == S_SWITCH_OUT_ACL_ACTION
> +                                ? S_SWITCH_IN_ACL_ACTION
> +                                : S_SWITCH_OUT_ACL_ACTION);
> +    build_acl_sample_est_rpl_stateful_flows(od, lflows, rpl_stage,
> +                                            match, actions,
> +                                            acl, reg_verdict, verdict_actions,
> +                                            copp_meter, sample_domain_id,
> +                                            lflow_ref);
> +}
> +
> +static void build_acl_reject_action(struct ds *actions, bool is_ingress);
> +
> +/* This builds all ACL sampling related logical flows:
> + * - for packets creating new connections
> + * - for packets that are part of an existing connection
> + */
> +static void
> +build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
> +                       const struct ovn_datapath *od,
> +                       struct lflow_table *lflows,
> +                       const struct nbrec_acl *acl,
> +                       const struct shash *meter_groups,
> +                       struct ds *match, struct ds *actions,
> +                       const struct sampling_app_table *sampling_apps,
> +                       struct lflow_ref *lflow_ref)
> +{
> +    bool should_sample_established =
> +        ls_stateful_rec->has_stateful_acl
> +        && acl->sample_est
> +        && !strcmp(acl->action, "allow-related");
> +
> +    bool stateful_match =
> +        ls_stateful_rec->has_stateful_acl
> +        && strcmp(acl->action, "allow-stateless");
> +
> +    /* Only sample if:
> +     * - sampling is enabled for traffic creating new connections
> +     * OR
> +     * - sampling is enabled for traffic on established sessions and the
> +     *   switch has stateful ACLs.
> +     */
> +    if (!acl->sample_new && !should_sample_established) {
> +        return;
> +    }
> +
> +    bool ingress = !strcmp(acl->direction, "from-lport") ? true : false;
> +    enum ovn_stage stage;
> +
> +    if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
> +        stage = S_SWITCH_IN_ACL_AFTER_LB_ACTION;
> +    } else if (ingress) {
> +        stage = S_SWITCH_IN_ACL_ACTION;
> +    } else {
> +        stage = S_SWITCH_OUT_ACL_ACTION;
> +    }
> +
> +    struct ds verdict_actions = DS_EMPTY_INITIALIZER;
> +    ds_put_cstr(&verdict_actions, REGBIT_ACL_VERDICT_ALLOW " = 0; "
> +                                  REGBIT_ACL_VERDICT_DROP " = 0; "
> +                                  REGBIT_ACL_VERDICT_REJECT " = 0; ");
> +    if (ls_stateful_rec->max_acl_tier) {
> +        ds_put_cstr(&verdict_actions, REG_ACL_TIER " = 0; ");
> +    }
> +
> +    /* Follow the same verdict determination logic as in
> +     * build_acl_action_lflows().
> +     */
> +    const char *reg_verdict = REGBIT_ACL_VERDICT_ALLOW;
> +    const char *copp_meter = NULL;
> +    if (!strcmp(acl->action, "drop")) {
> +        reg_verdict = REGBIT_ACL_VERDICT_DROP;
> +        ds_put_cstr(&verdict_actions, debug_implicit_drop_action());
> +    } else if (!strcmp(acl->action, "reject")) {
> +        reg_verdict = REGBIT_ACL_VERDICT_REJECT;
> +        copp_meter = copp_meter_get(COPP_REJECT, od->nbs->copp, meter_groups);
> +        build_acl_reject_action(&verdict_actions, ingress);
> +    } else if (!strcmp(acl->action, "allow")
> +               || !strcmp(acl->action, "allow-related")) {
> +        reg_verdict = REGBIT_ACL_VERDICT_ALLOW;
> +        ds_put_cstr(&verdict_actions, "next;");
> +    } else {
> +        /* XXX: ACLs with action "pass" do not support sampling. */
> +        goto done;
> +    }
> +
> +    uint8_t sample_new_domain_id =
> +        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_NEW_TRAFFIC);
> +    uint8_t sample_est_domain_id =
> +        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_EST_TRAFFIC);
> +
> +    if (!stateful_match) {
> +        build_acl_sample_new_stateless_flows(od, lflows, stage, match, actions,
> +                                             acl, reg_verdict,
> +                                             ds_cstr_ro(&verdict_actions),
> +                                             copp_meter, sample_new_domain_id,
> +                                             lflow_ref);
> +    } else {
> +        build_acl_sample_new_stateful_flows(od, lflows, stage, match, actions,
> +                                            acl, reg_verdict,
> +                                            ds_cstr_ro(&verdict_actions),
> +                                            copp_meter, sample_new_domain_id,
> +                                            lflow_ref);
> +        build_acl_sample_est_stateful_flows(od, lflows, stage, match, actions,
> +                                            acl, reg_verdict,
> +                                            ds_cstr_ro(&verdict_actions),
> +                                            copp_meter, sample_est_domain_id,
> +                                            lflow_ref);
> +    }
> +
> +done:
> +    ds_destroy(&verdict_actions);
> +}
> +
>   static void
>   consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>                const struct nbrec_acl *acl, bool has_stateful,
> @@ -6508,6 +6916,10 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>       if (!has_stateful
>           || !strcmp(acl->action, "pass")
>           || !strcmp(acl->action, "allow-stateless")) {
> +
> +        /* For stateless ACLs just sample "new" packets. */
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
> +
>           ds_put_cstr(actions, "next;");
>           ds_put_format(match, "(%s)", acl->match);
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
> @@ -6542,10 +6954,10 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>   
>           ds_truncate(actions, log_verdict_len);
>           ds_put_cstr(actions, REGBIT_CONNTRACK_COMMIT" = 1; ");
> -        if (acl->label) {
> -            ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
> -                          REG_LABEL" = %"PRId64"; ", acl->label);
> -        }
> +
> +        /* For stateful ACLs sample "new" and "established" packets. */
> +        build_acl_sample_label_action(actions, acl, acl->sample_new,
> +                                      acl->sample_est);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6565,9 +6977,11 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>                         acl->match);
>           if (acl->label) {
>               ds_put_cstr(actions, REGBIT_CONNTRACK_COMMIT" = 1; ");
> -            ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
> -                          REG_LABEL" = %"PRId64"; ", acl->label);
>           }
> +
> +        /* For stateful ACLs sample "new" and "established" packets. */
> +        build_acl_sample_label_action(actions, acl, acl->sample_new,
> +                                      acl->sample_est);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6585,6 +6999,9 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>           ds_put_format(match, " && (%s)", acl->match);
>   
>           ds_truncate(actions, log_verdict_len);
> +
> +        /* For drop ACLs just sample all packets as "new" packets. */
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
>           ds_put_cstr(actions, "next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6605,6 +7022,9 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
>           ds_put_format(match, " && (%s)", acl->match);
>   
>           ds_truncate(actions, log_verdict_len);
> +
> +        /* For drop ACLs just sample all packets as "new" packets. */
> +        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
>           ds_put_cstr(actions, "ct_commit { ct_mark.blocked = 1; }; next;");
>           ovn_lflow_add_with_hint(lflows, od, stage, priority,
>                                   ds_cstr(match), ds_cstr(actions),
> @@ -6685,6 +7105,20 @@ ovn_update_ipv6_options(struct hmap *lr_ports)
>   
>   #define IPV6_CT_OMIT_MATCH "nd || nd_ra || nd_rs || mldv1 || mldv2"
>   
> +static void
> +build_acl_reject_action(struct ds *actions, bool is_ingress)
> +{
> +    ds_put_format(
> +        actions, "reg0 = 0; "
> +        "reject { "
> +          "/* eth.dst <-> eth.src; ip.dst <-> ip.src; is implicit. */ "
> +          "outport <-> inport; next(pipeline=%s,table=%d); "
> +        "};",
> +        is_ingress ? "egress" : "ingress",
> +        is_ingress ? ovn_stage_get_table(S_SWITCH_OUT_QOS)
> +            : ovn_stage_get_table(S_SWITCH_IN_L2_LKUP));
> +}
> +
>   static void
>   build_acl_action_lflows(const struct ls_stateful_record *ls_stateful_rec,
>                           const struct ovn_datapath *od,
> @@ -6731,14 +7165,7 @@ build_acl_action_lflows(const struct ls_stateful_record *ls_stateful_rec,
>           bool ingress = ovn_stage_get_pipeline(stage) == P_IN;
>   
>           ds_truncate(actions, verdict_len);
> -        ds_put_format(
> -            actions, "reg0 = 0; "
> -            "reject { "
> -            "/* eth.dst <-> eth.src; ip.dst <-> ip.src; is implicit. */ "
> -            "outport <-> inport; next(pipeline=%s,table=%d); };",
> -            ingress ? "egress" : "ingress",
> -            ingress ? ovn_stage_get_table(S_SWITCH_OUT_QOS)
> -                : ovn_stage_get_table(S_SWITCH_IN_L2_LKUP));
> +        build_acl_reject_action(actions, ingress);
>   
>           ovn_lflow_metered(lflows, od, stage, 1000,
>                             REGBIT_ACL_VERDICT_REJECT " == 1", ds_cstr(actions),
> @@ -6778,12 +7205,6 @@ build_acl_log_related_flows(const struct ovn_datapath *od,
>        * the ACL, then we need to ensure that the related and reply
>        * traffic is logged, so we install a slightly higher-priority
>        * flow that matches the ACL, allows the traffic, and logs it.
> -     *
> -     * Note: Matching the ct_label.label may prevent OVS flow HW
> -     * offloading to work for some NICs because masked-access of
> -     * ct_label is not supported on those NICs due to HW
> -     * limitations. In such case the user may choose to avoid using the
> -     * "log-related" option.
>        */
>       bool ingress = !strcmp(acl->direction, "from-lport") ? true :false;
>       bool log_related = smap_get_bool(&acl->options, "log-related",
> @@ -6842,6 +7263,7 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>              struct lflow_table *lflows,
>              const struct ls_port_group_table *ls_port_groups,
>              const struct shash *meter_groups,
> +           const struct sampling_app_table *sampling_apps,
>              struct lflow_ref *lflow_ref)
>   {
>       const char *default_acl_action = default_acl_drop
> @@ -7031,6 +7453,9 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>           consider_acl(lflows, od, acl, has_stateful,
>                        meter_groups, ls_stateful_rec->max_acl_tier,
>                        &match, &actions, lflow_ref);
> +        build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
> +                               meter_groups, &match, &actions,
> +                               sampling_apps, lflow_ref);
>       }
>   
>       const struct ls_port_group *ls_pg =
> @@ -7047,6 +7472,9 @@ build_acls(const struct ls_stateful_record *ls_stateful_rec,
>                   consider_acl(lflows, od, acl, has_stateful,
>                                meter_groups, ls_stateful_rec->max_acl_tier,
>                                &match, &actions, lflow_ref);
> +                build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
> +                                       meter_groups, &match, &actions,
> +                                       sampling_apps, lflow_ref);
>               }
>           }
>       }
> @@ -7691,8 +8119,11 @@ build_lb_rules(struct lflow_table *lflows, struct ovn_lb_datapaths *lb_dps,
>   
>   static void
>   build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
> +               const struct sampling_app_table *sampling_apps,
>                  struct lflow_ref *lflow_ref)
>   {
> +    uint8_t sample_domain_est =
> +        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_EST_TRAFFIC);
>       struct ds actions = DS_EMPTY_INITIALIZER;
>   
>       /* Ingress LB, Ingress and Egress stateful Table (Priority 0): Packets are
> @@ -7709,8 +8140,13 @@ build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
>        * We always set ct_mark.blocked to 0 here as
>        * any packet that makes it this far is part of a connection we
>        * want to allow to continue. */
> -    ds_put_cstr(&actions, "ct_commit { ct_mark.blocked = 0; "
> -                          "ct_label.label = " REG_LABEL "; }; next;");
> +    ds_put_format(&actions,
> +                 "ct_commit { "
> +                    "ct_mark.blocked = 0; "
> +                    "ct_label.obs_domain_id = %"PRIu8 "; "
> +                    "ct_label.obs_point_id = " REG_OBS_POINT_ID_EST "; "
> +                  "}; next;",
> +                  sample_domain_est);
>       ovn_lflow_add(lflows, od, S_SWITCH_IN_STATEFUL, 100,
>                     REGBIT_CONNTRACK_COMMIT" == 1 && "
>                     REGBIT_ACL_LABEL" == 1",
> @@ -8718,6 +9154,7 @@ build_lswitch_lflows_pre_acl_and_acl(
>       struct ovn_datapath *od,
>       struct lflow_table *lflows,
>       const struct shash *meter_groups,
> +    const struct sampling_app_table *sampling_apps,
>       struct lflow_ref *lflow_ref)
>   {
>       ovs_assert(od->nbs);
> @@ -8725,7 +9162,7 @@ build_lswitch_lflows_pre_acl_and_acl(
>       build_pre_lb(od, meter_groups, lflows, lflow_ref);
>       build_pre_stateful(od, lflows, lflow_ref);
>       build_qos(od, lflows, lflow_ref);
> -    build_stateful(od, lflows, lflow_ref);
> +    build_stateful(od, lflows, sampling_apps, lflow_ref);
>       build_vtep_hairpin(od, lflows, lflow_ref);
>   }
>   
> @@ -15755,6 +16192,7 @@ build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
>                           const struct ovn_datapath *od,
>                           const struct ls_port_group_table *ls_pgs,
>                           const struct shash *meter_groups,
> +                        const struct sampling_app_table *sampling_apps,
>                           struct lflow_table *lflows)
>   {
>       build_ls_stateful_rec_pre_acls(ls_stateful_rec, od, ls_pgs, lflows,
> @@ -15764,7 +16202,7 @@ build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
>       build_acl_hints(ls_stateful_rec, od, lflows,
>                       ls_stateful_rec->lflow_ref);
>       build_acls(ls_stateful_rec, od, lflows, ls_pgs, meter_groups,
> -               ls_stateful_rec->lflow_ref);
> +               sampling_apps, ls_stateful_rec->lflow_ref);
>       build_lb_hairpin(ls_stateful_rec, od, lflows, ls_stateful_rec->lflow_ref);
>   }
>   
> @@ -15788,6 +16226,7 @@ struct lswitch_flow_build_info {
>       struct ds actions;
>       size_t thread_lflow_counter;
>       const char *svc_monitor_mac;
> +    const struct sampling_app_table *sampling_apps;
>   };
>   
>   /* Helper function to combine all lflow generation which is iterated by
> @@ -15802,7 +16241,8 @@ build_lswitch_and_lrouter_iterate_by_ls(struct ovn_datapath *od,
>   {
>       ovs_assert(od->nbs);
>       build_lswitch_lflows_pre_acl_and_acl(od, lsi->lflows,
> -                                         lsi->meter_groups, NULL);
> +                                         lsi->meter_groups,
> +                                         lsi->sampling_apps, NULL);
>   
>       build_fwd_group_lflows(od, lsi->lflows, NULL);
>       build_lswitch_lflows_admission_control(od, lsi->lflows, NULL);
> @@ -16079,6 +16519,7 @@ build_lflows_thread(void *arg)
>                       build_ls_stateful_flows(ls_stateful_rec, od,
>                                               lsi->ls_port_groups,
>                                               lsi->meter_groups,
> +                                            lsi->sampling_apps,
>                                               lsi->lflows);
>                   }
>               }
> @@ -16152,7 +16593,8 @@ build_lswitch_and_lrouter_flows(
>       const struct hmap *svc_monitor_map,
>       const struct hmap *bfd_connections,
>       const struct chassis_features *features,
> -    const char *svc_monitor_mac)
> +    const char *svc_monitor_mac,
> +    const struct sampling_app_table *sampling_apps)
>   {
>   
>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
> @@ -16186,6 +16628,7 @@ build_lswitch_and_lrouter_flows(
>               lsiv[index].svc_check_match = svc_check_match;
>               lsiv[index].thread_lflow_counter = 0;
>               lsiv[index].svc_monitor_mac = svc_monitor_mac;
> +            lsiv[index].sampling_apps = sampling_apps;
>               ds_init(&lsiv[index].match);
>               ds_init(&lsiv[index].actions);
>   
> @@ -16226,6 +16669,7 @@ build_lswitch_and_lrouter_flows(
>               .features = features,
>               .svc_check_match = svc_check_match,
>               .svc_monitor_mac = svc_monitor_mac,
> +            .sampling_apps = sampling_apps,
>               .match = DS_EMPTY_INITIALIZER,
>               .actions = DS_EMPTY_INITIALIZER,
>           };
> @@ -16298,6 +16742,7 @@ build_lswitch_and_lrouter_flows(
>                                      &od->nbs->header_.uuid));
>               build_ls_stateful_flows(ls_stateful_rec, od, lsi.ls_port_groups,
>                                       lsi.meter_groups,
> +                                    lsi.sampling_apps,
>                                       lsi.lflows);
>           }
>           stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
> @@ -16387,7 +16832,8 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
>                                       input_data->svc_monitor_map,
>                                       input_data->bfd_connections,
>                                       input_data->features,
> -                                    input_data->svc_monitor_mac);
> +                                    input_data->svc_monitor_mac,
> +                                    input_data->sampling_apps);
>   
>       if (parallelization_state == STATE_INIT_HASH_SIZES) {
>           parallelization_state = STATE_USE_PARALLELIZATION;
> @@ -16811,6 +17257,7 @@ lflow_handle_ls_stateful_changes(struct ovsdb_idl_txn *ovnsb_txn,
>           build_ls_stateful_flows(ls_stateful_rec, od,
>                                   lflow_input->ls_port_groups,
>                                   lflow_input->meter_groups,
> +                                lflow_input->sampling_apps,
>                                   lflows);
>   
>           /* Sync the new flows to SB. */
> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
> index b06b09ac5f..ca97a9b703 100644
> --- a/northd/ovn-northd.8.xml
> +++ b/northd/ovn-northd.8.xml
> @@ -899,6 +899,32 @@
>           next(pipeline=egress,table=5);}</code> action for SCTP associations.
>         </li>
>   
> +      <li>
> +        For each ACL with sample_new configured a priority 1100 flow is
> +        installed that additionally matches on the saved observation_point_id
> +        value.  This flow also generates a <code>sample()</code> action along
> +        with the usual action processing for the type of ACL.
> +      </li>
> +
> +      <li>
> +        For each ACL with sample_est configured a priority 1200 flow is
> +        installed that additionally matches on the saved observation_point_id
> +        value for established traffic in the original direction.  This flow
> +        also generates a <code>sample()</code> action along with the usual
> +        action processing for the type of ACL.
> +      </li>
> +
> +      <li>
> +        For each ACL with sample_est configured a priority 1200 flow is
> +        installed that additionally matches on the saved observation_point_id
> +        value for established traffic in the reply direction.  This flow also
> +        generates a <code>sample()</code> action along with the usual action
> +        processing for the type of ACL.  Note: this flow is installed in the
> +        opposite pipeline (in the ingress pipeline for ACLs applied in the
> +        egress direction and in the egress pipeline for ACLs applied in the
> +        ingress direction).
> +      </li>
> +
>         <li>
>           If any ACLs have tiers configured on them, then three priority 500
>           flows are installed. If the current tier counter is 0, 1, or 2, then
> diff --git a/ovn-nb.ovsschema b/ovn-nb.ovsschema
> index e131a4c083..6d26f468c7 100644
> --- a/ovn-nb.ovsschema
> +++ b/ovn-nb.ovsschema
> @@ -1,7 +1,7 @@
>   {
>       "name": "OVN_Northbound",
> -    "version": "7.4.0",
> -    "cksum": "1498303893 36355",
> +    "version": "7.5.0",
> +    "cksum": "102122798 38439",
>       "tables": {
>           "NB_Global": {
>               "columns": {
> @@ -30,6 +30,40 @@
>                   "ipsec": {"type": "boolean"}},
>               "maxRows": 1,
>               "isRoot": true},
> +        "Sample_Collector": {
> +            "columns": {
> +                "name": {"type": "string"},
> +                "probability": {"type": {"key": {
> +                    "type": "integer",
> +                    "minInteger": 0,
> +                    "maxInteger": 65535}}},
> +                "set_id": {"type": {"key": {
> +                    "type": "integer",
> +                    "minInteger": 0,
> +                    "maxInteger": 4294967295}}},
> +                "external_ids": {"type": {"key": "string", "value": "string",
> +                                          "min": 0, "max": "unlimited"}}
> +            },
> +            "indexes": [["name"]],
> +            "isRoot": true
> +        },
> +        "Sample": {
> +            "columns": {
> +                "collectors": {"type": {"key": {"type": "uuid",
> +                                                "refTable": "Sample_Collector",
> +                                                "refType": "strong"},
> +                                        "min": 0,
> +                                        "max": "unlimited"}},
> +                "metadata": {"type": {"key": {"type": "integer",
> +                                              "minInteger": 1,
> +                                              "maxInteger": 4294967295},
> +                                      "min": 1, "max":1}},
> +                "external_ids": {"type": {"key": "string", "value": "string",
> +                                          "min": 0, "max": "unlimited"}}
> +            },
> +            "indexes": [["metadata"]],
> +            "isRoot": true
> +        },
>           "Copp": {
>               "columns": {
>                   "name": {"type": "string"},
> @@ -275,6 +309,14 @@
>                   "tier": {"type": {"key": {"type": "integer",
>                                             "minInteger": 0,
>                                             "maxInteger": 3}}},
> +                "sample_new": {"type": {"key": {"type": "uuid",
> +                                                "refTable": "Sample",
> +                                                "refType": "strong"},
> +                                        "min": 0, "max": 1}},
> +                "sample_est": {"type": {"key": {"type": "uuid",
> +                                                "refTable": "Sample",
> +                                                "refType": "strong"},
> +                                        "min": 0, "max": 1}},
>                   "options": {
>                        "type": {"key": "string",
>                                 "value": "string",
> diff --git a/ovn-nb.xml b/ovn-nb.xml
> index f2a8b5c076..480144ddc1 100644
> --- a/ovn-nb.xml
> +++ b/ovn-nb.xml
> @@ -490,6 +490,48 @@
>   
>     </table>
>   
> +  <table name="Sample_Collector" title="Sample_Collector">
> +    <column name="name">
> +      Sample collector name.
> +    </column>
> +    <column name="probability">
> +      Sampling probability for this collector.  It must be an integer number
> +      between 0 and 65535.  A value of 0 corresponds to no packets being
> +      sampled while a value of 65535 corresponds to all packets being sampled.
> +    </column>
> +    <column name="set_id">
> +      The 32-bit integer identifier of the set of of collectors to send
> +      packets to. See Flow_Sample_Collector_Set Table in ovs-vswitchd's
> +      database schema.
> +    </column>
> +    <column name="external_ids">
> +      See <em>External IDs</em> at the beginning of this document.
> +    </column>
> +  </table>
> +
> +  <table name="Sample" title="Sample">
> +    <p>
> +      This table describes a Sampling configuration. Entries in other tables
> +      might be associated with Sample entries to indicate how the sample
> +      should be generated.
> +
> +      For an example, see <ref table="ACL"/>.
> +    </p>
> +    <column name="collectors">
> +      A list of references to <ref table="Sample_Collector"/> records to be
> +      used when generating samples (e.g., IPFIX).  A sample can be sent to
> +      multiple collectors simultaneously.
> +    </column>
> +    <column name="metadata">
> +      Will be used as Observation Point ID in every sample.  The Observation
> +      Domain ID will be generated by ovn-northd and includes the logical
> +      datapath key as the least significant 24 bits and the sampling
> +      application type (e.g., drop debugging) as the 8 most significant bits.
> +    </column>
> +    <column name="external_ids">
> +      See <em>External IDs</em> at the beginning of this document.
> +    </column>
> +  </table>
>     <table name="Copp" title="Control plane protection">
>       <p>
>         This table is used to define control plane protection policies, i.e.,
> @@ -2500,6 +2542,27 @@ or
>         </column>
>       </group>
>   
> +    <column name="sample_new">
> +      <p>
> +        The entry in the <ref table="Sample"/> table to use for sampling for
> +        new sessions matched by this ACL.  In case the ACL is stateless
> +        this is used for sampling all traffic matched by the ACL.
> +
> +        NOTE: ACLs with action <code>pass</code> currently do not support
> +        sampling of traffic on new sessions.
> +      </p>
> +    </column>
> +
> +    <column name="sample_est">
> +      <p>
> +        The entry in the <ref table="Sample"/> table to use for sampling for
> +        established/related sessions matched by this ACL.
> +
> +        NOTE: ACLs with action <code>pass</code> currently do not support
> +        sampling of traffic on established sessions.
> +      </p>
> +    </column>
> +
>       <group title="Common Columns">
>         <column name="options">
>           This column provides general key/value settings. The supported
> diff --git a/tests/atlocal.in b/tests/atlocal.in
> index 32d1c374ea..29e1bb2982 100644
> --- a/tests/atlocal.in
> +++ b/tests/atlocal.in
> @@ -196,6 +196,12 @@ find_command bfdd-beacon
>   # Set HAVE_ARPING
>   find_command arping
>   
> +# Set HAVE_NFCAPD
> +find_command nfcapd
> +
> +# Set HAVE_NFDUMP
> +find_command nfdump
> +
>   # Turn off proxies.
>   unset http_proxy
>   unset https_proxy
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index 47ada5c70e..863d85daf4 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -1049,6 +1049,10 @@ ovn_strip_lflows() {
>        sed 's/table=[[0-9]]\{1,2\}\s\?/table=??/g' | sort
>   }
>   
> +ovn_strip_collector_set() {
> +    sed 's/collector_set=[[0-9]]*,\?/collector_set=??,/g'
> +}
> +
>   OVS_END_SHELL_HELPERS
>   
>   m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
> diff --git a/tests/ovn-nbctl.at b/tests/ovn-nbctl.at
> index 31de309215..0905a021eb 100644
> --- a/tests/ovn-nbctl.at
> +++ b/tests/ovn-nbctl.at
> @@ -2802,6 +2802,26 @@ check_row_count nb:ACL 0
>   
>   dnl ---------------------------------------------------------------------
>   
> +OVN_NBCTL_TEST([acl_sampling], [ACL sampling operations], [
> +check ovn-nbctl ls-add ls
> +sample1=$(ovn-nbctl create sample metadata=4301)
> +sample2=$(ovn-nbctl create sample metadata=4302)
> +check_row_count nb:Sample 2
> +
> +check ovn-nbctl --sample-new=$sample1 acl-add ls from-lport 1 1 allow-related
> +check_column "$sample1" nb:ACL sample_new priority=1
> +
> +check ovn-nbctl --sample-est=$sample2 acl-add ls from-lport 2 1 allow-related
> +check_column "" nb:ACL sample_new priority=2
> +check_column "$sample2" nb:ACL sample_est priority=2
> +
> +check ovn-nbctl --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 3 1 allow-related
> +check_column "$sample1" nb:ACL sample_new priority=3
> +check_column "$sample2" nb:ACL sample_est priority=3
> +])
> +
> +dnl ---------------------------------------------------------------------
> +
>   AT_SETUP([ovn-nbctl - daemon retry connection])
>   OVN_NBCTL_TEST_START daemon
>   pid=$(cat ovsdb-server.pid)
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 3fb883225a..a6843ce998 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -4604,7 +4604,7 @@ check_stateful_flows() {
>       AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>       AT_CHECK_UNQUOTED([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
> @@ -4628,7 +4628,7 @@ check_stateful_flows() {
>       AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   }
>   
> @@ -4670,7 +4670,7 @@ AT_CHECK([grep "ls_in_lb " sw0flows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
> @@ -4691,7 +4691,7 @@ AT_CHECK([grep "ls_out_pre_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # LB with event=false and reject=false
> @@ -4720,23 +4720,23 @@ ovn-sbctl dump-flows sw0 > sw0flows
>   AT_CAPTURE_FILE([sw0flows])
>   
>   AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>   ])
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>   ])
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # Add new ACL without label
> @@ -4747,27 +4747,27 @@ ovn-sbctl dump-flows sw0 > sw0flows
>   AT_CAPTURE_FILE([sw0flows])
>   
>   AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>     table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
> -  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>     table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
>   ])
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>     table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
> -  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
> +  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
>     table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
>   ])
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   # Delete new ACL with label
> @@ -4784,7 +4784,7 @@ AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0]
>   AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
> @@ -4794,7 +4794,7 @@ AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0
>   AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
>     table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   AT_CLEANUP
>   ])
> @@ -4822,7 +4822,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls from-lport 1
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AS_BOX([from-lport --apply-after-lb allow-related ACL])
> @@ -4830,7 +4830,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --apply-after-lb --label=1234 acl-add
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AS_BOX([to-lport allow-related ACL])
> @@ -4838,7 +4838,7 @@ check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls to-lport 1 ip
>   
>   dnl Check that the label is committed to conntrack in the ingress pipeline
>   AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_out_stateful -A 2 | grep commit], [0], [dnl
> -    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
>   ])
>   
>   AT_CLEANUP
> @@ -7658,7 +7658,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AS_BOX([Remove and add the ACLs back with the apply-after-lb option])
> @@ -7713,7 +7713,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AS_BOX([Remove and add the ACLs back with a few ACLs with apply-after-lb option])
> @@ -7768,7 +7768,7 @@ AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
>   AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
>     table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
>     table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
> -  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
> +  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
>   ])
>   
>   AT_CLEANUP
> @@ -12491,6 +12491,198 @@ AT_CHECK([ovn-sbctl lflow-list | grep ls_in_l2_unknown.*sample | ovn_strip_lflow
>   AT_CLEANUP
>   ])
>   
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([ACL Sampling])
> +AT_KEYWORDS([acl])
> +
> +ovn_start
> +
> +collector1=$(ovn-nbctl create Sample_Collector name=test-collector1 probability=65535 set_id=1)
> +collector2=$(ovn-nbctl create Sample_Collector name=test-collector2 probability=65535 set_id=2)
> +check_row_count nb:Sample_Collector 2
> +
> +ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" id="42"
> +ovn-nbctl create Sampling_App name="acl-est-traffic-sampling" id="43"
> +check_row_count nb:Sampling_App 2
> +
> +sample1=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=4301)
> +sample2=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=4302)
> +check_row_count nb:Sample 2
> +
> +check ovn-nbctl                               \
> +  -- ls-add ls                                \
> +  -- lsp-add ls lsp1                          \
> +  -- lsp-set-addresses lsp1 00:00:00:00:00:01 \
> +  -- lsp-add ls lsp2                          \
> +  -- lsp-set-addresses lsp2 00:00:00:00:00:02
> +check ovn-nbctl --wait=sb sync
> +
> +base_flow="inport == \"lsp1\" && eth.src == 00:00:00:00:00:01 && eth.dst == 00:00:00:00:00:02 && ip4.src == 42.42.42.1 && ip4.dst == 42.42.42.2"
> +
> +AS_BOX([from-lport ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_action -e ls_in_acl_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_action   ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_action   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
> +    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
> +])
> +
> +AS_BOX([from-lport ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --sample-new=$sample1 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_action -e ls_in_acl_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_action   ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 0;
> +])
> +
> +AS_BOX([from-lport-after-lb ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --apply-after-lb --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_action -e ls_in_acl_after_lb_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_after_lb_action), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_after_lb_action), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
> +    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
> +])
> +
> +AS_BOX([from-lport-after-lb ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --apply-after-lb --sample-new=$sample1 acl-add ls from-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_action -e ls_in_acl_after_lb_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_after_lb_action), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 0;
> +])
> +
> +AS_BOX([to-lport ACL sampling (new, est)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --sample-new=$sample1 --sample-est=$sample2 acl-add ls to-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_action -e ls_out_acl_eval -e ls_in_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_in_acl_action   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_out_acl_action  ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; };
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace estasblished connections.
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 4302;
> +    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
> +    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
> +])
> +
> +AS_BOX([to-lport ACL sampling (new)])
> +check ovn-nbctl acl-del ls
> +check ovn-nbctl --wait=sb --sample-new=$sample1 acl-add ls to-lport 1 "1" allow-related
> +AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_action -e ls_out_acl_eval -e ls_in_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
> +  table=??(ls_out_acl_action  ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
> +])
> +
> +dnl Trace new connections.
> +flow="$base_flow"
> +AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
> +    ct_commit { ct_mark.blocked = 0; };
> +    reg9 = 0;
> +    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
> +    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
> +])
> +
> +dnl Trace established connections (no point id was committed in the label in
> +dnl the original direction).
> +flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
> +AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
> +    reg9 = 0;
> +])
> +
> +AT_CLEANUP
> +])
> +
>   OVN_FOR_EACH_NORTHD_NO_HV([
>   AT_SETUP([NAT with match])
>   ovn_start
> diff --git a/tests/ovn.at b/tests/ovn.at
> index 877d0dfdba..df3c1f942d 100644
> --- a/tests/ovn.at
> +++ b/tests/ovn.at
> @@ -329,6 +329,9 @@ ct.trk = ct_state[5]
>   ct_label = NXM_NX_CT_LABEL
>   ct_label.ecmp_reply_eth = ct_label[32..79]
>   ct_label.label = ct_label[96..127]
> +ct_label.obs_domain_id = ct_label[88..95]
> +ct_label.obs_point_id = ct_label[96..127]
> +ct_label.obs_unused = ct_label[0..87]
>   ct_mark = NXM_NX_CT_MARK
>   ct_mark.blocked = ct_mark[0]
>   ct_mark.ecmp_reply_port = ct_mark[16..31]
> @@ -1355,6 +1358,12 @@ ct_commit(ct_label=18446744073709551615);
>   ct_commit(ct_label=18446744073709551616);
>       Syntax error at `(' expecting `;'.
>   
> +# Observation domain and point id.
> +ct_commit {ct_label.obs_domain_id = 42; ct_label.obs_point_id = reg2; };
> +    formats as ct_commit { ct_label.obs_domain_id = 42; ct_label.obs_point_id = reg2; };
> +    encodes as ct(commit,zone=NXM_NX_REG13[[0..15]],exec(set_field:0x2a0000000000000000000000/0xff0000000000000000000000->ct_label,move:NXM_NX_XXREG0[[32..63]]->NXM_NX_CT_LABEL[[96..127]]))
> +    has prereqs ip
> +
>   ct_mark = 12345
>       Field ct_mark is not modifiable.
>   ct_mark.blocked = 1/1
> diff --git a/tests/system-common-macros.at b/tests/system-common-macros.at
> index 691c271a3a..c595561734 100644
> --- a/tests/system-common-macros.at
> +++ b/tests/system-common-macros.at
> @@ -237,6 +237,17 @@ m4_define([STRIP_MONITOR_CSUM], [grep "csum:" | sed 's/csum:.*/csum: <skip>/'])
>   m4_define([FORMAT_CT],
>       [[grep -F "dst=$1," | sed -e 's/port=[0-9]*/port=<cleared>/g' -e 's/id=[0-9]*/id=<cleared>/g' -e 's/state=[0-9_A-Z]*/state=<cleared>/g' | sort | uniq]])
>   
> +# DAEMONIZE([command], [pidfile])
> +#
> +# Run 'command' as a background process and record its pid to 'pidfile' to
> +# allow cleanup on exit.
> +#
> +m4_define([DAEMONIZE],
> +   [$1 & echo $! > $2
> +     echo "kill \`cat $2\`" >> cleanup
> +   ]
> +)
> +
>   # NETNS_DAEMONIZE([namespace], [command], [pidfile])
>   #
>   # Run 'command' as a background process within 'namespace' and record its pid
> diff --git a/tests/system-ovn.at b/tests/system-ovn.at
> index ddb3d14e92..14f12524c0 100644
> --- a/tests/system-ovn.at
> +++ b/tests/system-ovn.at
> @@ -13022,3 +13022,152 @@ OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port patch-.*/d
>   /connection dropped.*/d"])
>   AT_CLEANUP
>   ])
> +
> +OVN_FOR_EACH_NORTHD([
> +AT_SETUP([ovn -- ACL Sampling])
> +AT_SKIP_IF([test $HAVE_TCPDUMP = no])
> +AT_SKIP_IF([test $HAVE_NFCAPD = no])
> +AT_SKIP_IF([test $HAVE_NFDUMP = no])
> +AT_KEYWORDS([ACL])
> +
> +CHECK_CONNTRACK()
> +CHECK_CONNTRACK_NAT()
> +ovn_start
> +OVS_TRAFFIC_VSWITCHD_START()
> +ADD_BR([br-int])
> +
> +dnl Set external-ids in br-int needed for ovn-controller
> +check ovs-vsctl \
> +        -- set Open_vSwitch . external-ids:system-id=hv1 \
> +        -- set Open_vSwitch . external-ids:ovn-remote=unix:$ovs_base/ovn-sb/ovn-sb.sock \
> +        -- set Open_vSwitch . external-ids:ovn-encap-type=geneve \
> +        -- set Open_vSwitch . external-ids:ovn-encap-ip=169.0.0.1 \
> +        -- set bridge br-int fail-mode=secure other-config:disable-in-band=true
> +
> +dnl Start ovn-controller
> +start_daemon ovn-controller
> +
> +dnl Logical network:
> +dnl 1 logical switch connetected to one logical router
> +dnl 3 UDP load balancers (ports 1000, 2000, 3000)
> +dnl 2 VIFs
> +
> +check ovn-nbctl                                                  \
> +    -- lr-add rtr                                                \
> +    -- lrp-add rtr rtr-ls 00:00:00:00:01:00 42.42.42.1/24        \
> +    -- ls-add ls                                                 \
> +    -- lsp-add ls ls-rtr                                         \
> +    -- lsp-set-addresses ls-rtr 00:00:00:00:01:00                \
> +    -- lsp-set-type ls-rtr router                                \
> +    -- lsp-set-options ls-rtr router-port=rtr-ls                 \
> +    -- lsp-add ls vm1 -- lsp-set-addresses vm1 00:00:00:00:00:01 \
> +    -- lsp-add ls vm2 -- lsp-set-addresses vm2 00:00:00:00:00:02 \
> +    -- lb-add lb1 43.43.43.43:1000 42.42.42.3:1000 udp           \
> +    -- lb-add lb2 43.43.43.43:2000 42.42.42.3:2000 udp           \
> +    -- lb-add lb3 43.43.43.43:3000 42.42.42.3:3000 udp           \
> +    -- ls-lb-add ls lb1                                          \
> +    -- ls-lb-add ls lb2                                          \
> +    -- ls-lb-add ls lb3
> +
> +ADD_NAMESPACES(vm1)
> +ADD_VETH(vm1, vm1, br-int, "42.42.42.2/24", "00:00:00:00:00:01", "42.42.42.1")
> +
> +ADD_NAMESPACES(vm2)
> +ADD_VETH(vm2, vm2, br-int, "42.42.42.3/24", "00:00:00:00:00:02", "42.42.42.1")
> +
> +collector1=$(ovn-nbctl create Sample_Collector name=test-collector1 probability=65535 set_id=1)
> +collector2=$(ovn-nbctl create Sample_Collector name=test-collector2 probability=65535 set_id=2)
> +check_row_count nb:Sample_Collector 2
> +
> +ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" id="42"
> +ovn-nbctl create Sampling_App name="acl-est-traffic-sampling" id="43"
> +check_row_count nb:Sampling_App 2
> +
> +sample1=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=1001)
> +sample2=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=1002)
> +sample3=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=2001)
> +sample4=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=2002)
> +sample5=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=3001)
> +sample6=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=3002)
> +check_row_count nb:Sample 6
> +
> +dnl Create ACLs that match the 3 types of traffic in all 3 possible stages:
> +dnl from-lport, from-lport-after-lb, to-lport
> +check ovn-nbctl --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "inport == \"vm1\" && udp.dst == 1000" allow-related
> +check ovn-nbctl --apply-after-lb --sample-new=$sample3 --sample-est=$sample4 acl-add ls from-lport 1 "inport == \"vm1\" && udp.dst == 2000" allow-related
> +check ovn-nbctl --sample-new=$sample5 --sample-est=$sample6 acl-add ls to-lport 1 "outport == \"vm2\" && udp.dst == 3000" allow-related
> +
> +dnl Wait for ovn-controller to catch up.
> +wait_for_ports_up
> +check ovn-nbctl --wait=hv sync
> +
> +dnl Start an IPFIX collector.
> +DAEMONIZE([nfcapd -B 1024000 -w . -p 4242 2> collector.err], [collector.pid])
> +
> +dnl Wait for the collector to be up.
> +OVS_WAIT_UNTIL([grep -q 'Startup nfcapd.' collector.err])
> +
> +dnl Configure the OVS flow sample collector.
> +ovs-vsctl --id=@br get Bridge br-int \
> +    -- --id=@ipfix create IPFIX targets=\"127.0.0.1:4242\" template_interval=1 \
> +    -- --id=@cs create Flow_Sample_Collector_Set id=1 bridge=@br ipfix=@ipfix
> +
> +dnl And wait for it to be up and running.
> +OVS_WAIT_UNTIL([ovs-ofctl dump-ipfix-flow br-int | grep -q '1 ids'])
> +
> +dnl Start UDP echo server on vm2.
> +NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 1000], [nc-vm2-1000.pid])
> +NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 2000], [nc-vm2-2000.pid])
> +NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 3000], [nc-vm2-3000.pid])
> +
> +dnl Send traffic to the UDP LB1 (hits the from-lport ACL).
> +NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 1000])
> +
> +dnl Send traffic to the UDP LB1 (hits the from-lport after-lb ACL).
> +NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 2000])
> +
> +dnl Send traffic to the UDP LB1 (hits the from-lport ACL).
> +NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 3000])
> +
> +dnl Wait until OVS sampled all expected packets (2 data packets + 1 ICMP
> +dnl port unreachable error on each session).
> +OVS_WAIT_UNTIL([ovs-ofctl dump-ipfix-flow br-int | grep -q 'sampled pkts=9'])
> +
> +dnl Check the IPFIX samples.
> +kill $(cat collector.pid)
> +OVS_WAIT_WHILE([kill -0 $(cat collector.pid) 2>/dev/null])
> +
> +dnl Can't match on observation domain ID due to the followig fix not being
> +dnl available in any released version of nfdump:
> +dnl https://github.com/phaag/nfdump/issues/544
> +dnl
> +dnl Only match on the point ID.
> +AT_CHECK([nfdump -r nfcapd.* -o json | grep observationPointID | awk '{$1=$1;print}' | sort], [0], [dnl
> +"observationPointID" : 1001,
> +"observationPointID" : 1002,
> +"observationPointID" : 1002,
> +"observationPointID" : 2001,
> +"observationPointID" : 2002,
> +"observationPointID" : 2002,
> +"observationPointID" : 3001,
> +"observationPointID" : 3002,
> +"observationPointID" : 3002,
> +])
> +
> +OVS_APP_EXIT_AND_WAIT([ovn-controller])
> +
> +as ovn-sb
> +OVS_APP_EXIT_AND_WAIT([ovsdb-server])
> +
> +as ovn-nb
> +OVS_APP_EXIT_AND_WAIT([ovsdb-server])
> +
> +as northd
> +OVS_APP_EXIT_AND_WAIT([ovn-northd])
> +
> +as
> +OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port patch-.*/d
> +/connection dropped.*/d"])
> +
> +AT_CLEANUP
> +])
> diff --git a/utilities/containers/fedora/Dockerfile b/utilities/containers/fedora/Dockerfile
> index 078180cff3..4dce1e32b4 100755
> --- a/utilities/containers/fedora/Dockerfile
> +++ b/utilities/containers/fedora/Dockerfile
> @@ -27,6 +27,7 @@ RUN dnf -y update \
>           libcap-ng-devel \
>           libtool \
>           net-tools \
> +        nfdump \
>           ninja-build \
>           nmap-ncat \
>           numactl-devel \
> diff --git a/utilities/containers/ubuntu/Dockerfile b/utilities/containers/ubuntu/Dockerfile
> index 7cf0751225..073afa8764 100755
> --- a/utilities/containers/ubuntu/Dockerfile
> +++ b/utilities/containers/ubuntu/Dockerfile
> @@ -33,6 +33,7 @@ RUN apt update -y \
>           llvm-dev \
>           ncat \
>           net-tools \
> +        nfdump \
>           ninja-build \
>           python3-dev \
>           python3-pip \
> diff --git a/utilities/ovn-nbctl.8.xml b/utilities/ovn-nbctl.8.xml
> index e2657ca02c..e1e5b681e1 100644
> --- a/utilities/ovn-nbctl.8.xml
> +++ b/utilities/ovn-nbctl.8.xml
> @@ -399,7 +399,7 @@
>         must be either <code>switch</code> or <code>port-group</code>.
>       </p>
>       <dl>
> -        <dt>[<code>--type=</code>{<code>switch</code> | <code>port-group</code>}] [<code>--log</code>] [<code>--meter=</code><var>meter</var>] [<code>--severity=</code><var>severity</var>] [<code>--name=</code><var>name</var>] [<code>--label=</code><var>label</var>] [<code>--may-exist</code>] [<code>--apply-after-lb</code>] [<code>--tier</code>] <code>acl-add</code> <var>entity</var> <var>direction</var> <var>priority</var> <var>match</var> <var>verdict</var></dt>
> +        <dt>[<code>--type=</code>{<code>switch</code> | <code>port-group</code>}] [<code>--log</code>] [<code>--meter=</code><var>meter</var>] [<code>--severity=</code><var>severity</var>] [<code>--name=</code><var>name</var>] [<code>--label=</code><var>label</var>] [<code>--sample-new=</code><var>sample</var>] [<code>--sample-est=</code><var>sample</var>] [<code>--may-exist</code>] [<code>--apply-after-lb</code>] [<code>--tier</code>] <code>acl-add</code> <var>entity</var> <var>direction</var> <var>priority</var> <var>match</var> <var>verdict</var></dt>
>         <dd>
>           <p>
>             Adds the specified ACL to <var>entity</var>.  <var>direction</var>
> @@ -424,6 +424,12 @@
>             names a meter configured by <code>meter-add</code>.
>           </p>
>   
> +        <p>
> +          The <code>--sample-new</code> (and optionally
> +          <code>--sample-est</code>) enable ACL sampling. A valid uuid of a
> +          row of the <ref table="Sample"/> table must be provided.
> +        </p>
> +
>           <p>
>             The <code>--apply-after-lb</code> option sets
>             <code>apply-after-lb=true</code> in the <code>options</code> column
> diff --git a/utilities/ovn-nbctl.c b/utilities/ovn-nbctl.c
> index c37cc010ce..816f26cf4b 100644
> --- a/utilities/ovn-nbctl.c
> +++ b/utilities/ovn-nbctl.c
> @@ -2308,6 +2308,11 @@ nbctl_pre_acl(struct ctl_context *ctx)
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_match);
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_options);
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_tier);
> +    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_new);
> +    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_est);
> +
> +    ovsdb_idl_add_table(ctx->idl, &nbrec_table_sample_collector);
> +    ovsdb_idl_add_table(ctx->idl, &nbrec_table_sample);
>   }
>   
>   static void
> @@ -2321,6 +2326,8 @@ nbctl_pre_acl_list(struct ctl_context *ctx)
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_severity);
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_meter);
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_label);
> +    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_new);
> +    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_est);
>       ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_options);
>   }
>   
> @@ -2372,6 +2379,8 @@ nbctl_acl_add(struct ctl_context *ctx)
>       const char *severity = shash_find_data(&ctx->options, "--severity");
>       const char *name = shash_find_data(&ctx->options, "--name");
>       const char *meter = shash_find_data(&ctx->options, "--meter");
> +    const char *sample_new = shash_find_data(&ctx->options, "--sample-new");
> +    const char *sample_est = shash_find_data(&ctx->options, "--sample-est");
>       if (log || severity || name || meter) {
>           nbrec_acl_set_log(acl, true);
>       }
> @@ -2388,6 +2397,38 @@ nbctl_acl_add(struct ctl_context *ctx)
>       if (meter) {
>           nbrec_acl_set_meter(acl, meter);
>       }
> +    if (sample_new) {
> +        const struct nbrec_sample *sample_elem = NULL;
> +        struct uuid sample_uuid;
> +
> +        if (uuid_from_string(&sample_uuid, sample_new)) {
> +            sample_elem = nbrec_sample_get_for_uuid(ctx->idl, &sample_uuid);
> +            if (!sample_elem) {
> +                ctl_error(ctx, "sample record not found");
> +                return;
> +            }
> +            nbrec_acl_set_sample_new(acl, sample_elem);
> +        } else {
> +            ctl_error(ctx, "a valid uuid must be provided");
> +            return;
> +        }
> +    }
> +    if (sample_est) {
> +        const struct nbrec_sample *sample_elem = NULL;
> +        struct uuid sample_uuid;
> +
> +        if (uuid_from_string(&sample_uuid, sample_est)) {
> +            sample_elem = nbrec_sample_get_for_uuid(ctx->idl, &sample_uuid);
> +            if (!sample_elem) {
> +                ctl_error(ctx, "sample record not found");
> +                return;
> +            }
> +            nbrec_acl_set_sample_est(acl, sample_elem);
> +        } else {
> +            ctl_error(ctx, "a valid uuid must be provided");
> +            return;
> +        }

The error messages for --sample-new and --sample-est are identical. If 
both options are provided, but only one of them is invalid, the error 
message doesn't help the user to know which of the two options was 
invalid. I suggest changing the error messages to be more specific:

"--sample-new record not found."
"a valid --sample-est UUID must be provided."

> +    }
>   
>       /* Set the ACL label */
>       const char *label = shash_find_data(&ctx->options, "--label");
> @@ -7915,7 +7956,7 @@ static const struct ctl_command_syntax nbctl_commands[] = {
>       { "acl-add", 5, 6, "{SWITCH | PORTGROUP} DIRECTION PRIORITY MATCH ACTION",
>         nbctl_pre_acl, nbctl_acl_add, NULL,
>         "--log,--may-exist,--type=,--name=,--severity=,--meter=,--label=,"
> -      "--apply-after-lb,--tier=", RW },
> +      "--apply-after-lb,--tier=,--sample-new=,--sample-est=", RW },
>       { "acl-del", 1, 4, "{SWITCH | PORTGROUP} [DIRECTION [PRIORITY MATCH]]",
>         nbctl_pre_acl, nbctl_acl_del, NULL, "--type=,--tier=", RW },
>       { "acl-list", 1, 1, "{SWITCH | PORTGROUP}",
Dumitru Ceara July 31, 2024, 7:33 a.m. UTC | #2
Thanks for the review, Mark!

On 7/30/24 23:29, Mark Michelson wrote:
> Based on discussions I've had with devs, this change can result in quite
> a lot of flows being installed. There is an issue [1] that seeks to
> optimize this, but unfortunately this optimization revealed some other
> issues, including in the kernel. Therefore, the optimization is not
> viable to get in for ovn24.09.
> 

It seems Ales and I managed to find a way to make the optimization [1]
(using sample action with observation domain/point ids from fields) work
without having to change the (arguably buggy) OVN/kernel conntrack
related behavior.

I'll post v4 that incorporates commits from Ales to implement [1] once
CI passes in my fork.

> Since ACL sampling is opt-in, I'm of the opinion that this feature is
> acceptable to get merged, even if the suggested optimization is not
> acceptable. Opting in comes with the caveat that you may see quite a
> large number of flows installed per as a result. Of course, since
> sampling is configured per-ACL, it's also possible to only opt in on
> certain ACLs instead of all of them, thus not resulting in quite the
> same flow explosion. It may be worth noting in the ovn-nb.xml
> documentation this particular risk.
> 
> I had some potential optimization-related suggestions initially as part
> of this review, but since optimization is something that is being
> delayed anyway, I think this can be merged as-is, with one suggestion
> from me in the ovn-nbctl change. See below.
> 

Thanks, I'll include your suggestion in v4.

> Acked-by: Mark Michelson <mmichels@redhat.com>
> 

As this patch changes a bit in v4, I won't add your ack for now.  It
would be great if you could have another look once the new revision is
available.

Thanks,
Dumitru

> [1] https://issues.redhat.com/browse/FDP-709
>
diff mbox series

Patch

diff --git a/NEWS b/NEWS
index fcf182bc02..7899c623f2 100644
--- a/NEWS
+++ b/NEWS
@@ -41,6 +41,9 @@  Post v24.03.0
   - The NB_Global.debug_drop_domain_id configured value is now overridden by
     the ID associated with the Sampling_App record created for drop sampling
     (Sampling_App.name configured to be "drop-sampling").
+  - Add support for ACL sampling through the new Sample_Collector and Sample
+    tables.  Sampling is supported for both traffic that creates new
+    connections and for traffic that is part of an existing connection.
 
 OVN v24.03.0 - 01 Mar 2024
 --------------------------
diff --git a/lib/logical-fields.c b/lib/logical-fields.c
index 4acf8a677e..05292eb4aa 100644
--- a/lib/logical-fields.c
+++ b/lib/logical-fields.c
@@ -175,6 +175,12 @@  ovn_init_symtab(struct shash *symtab)
                                     WR_CT_COMMIT);
     expr_symtab_add_subfield_scoped(symtab, "ct_label.label", NULL,
                                     "ct_label[96..127]", WR_CT_COMMIT);
+    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_point_id", NULL,
+                                    "ct_label[96..127]", WR_CT_COMMIT);
+    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_domain_id", NULL,
+                                    "ct_label[88..95]", WR_CT_COMMIT);
+    expr_symtab_add_subfield_scoped(symtab, "ct_label.obs_unused", NULL,
+                                    "ct_label[0..87]", WR_CT_COMMIT);
 
     expr_symtab_add_field(symtab, "ct_state", MFF_CT_STATE, NULL, false);
 
diff --git a/northd/northd.c b/northd/northd.c
index 901b9e9cd1..80b9164460 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -50,6 +50,7 @@ 
 #include "en-lr-nat.h"
 #include "en-lr-stateful.h"
 #include "en-ls-stateful.h"
+#include "en-sampling-app.h"
 #include "lib/ovn-parallel-hmap.h"
 #include "ovn/actions.h"
 #include "ovn/features.h"
@@ -184,8 +185,10 @@  static bool vxlan_mode;
 
 #define REG_ORIG_TP_DPORT_ROUTER   "reg9[16..31]"
 
-/* Register used for setting a label for ACLs in a Logical Switch. */
-#define REG_LABEL "reg3"
+/* Registers used for pasing observability information for switches:
+ * domain and point ID. */
+#define REG_OBS_POINT_ID_NEW "reg3"
+#define REG_OBS_POINT_ID_EST "reg9"
 
 /* Register used for temporarily store ECMP eth.src to avoid masked ct_label
  * access. It doesn't really occupy registers because the content of the
@@ -209,13 +212,13 @@  static bool vxlan_mode;
  * |    |     REGBIT_{HAIRPIN/HAIRPIN_REPLY}           |   |                                   |
  * |    | REGBIT_ACL_HINT_{ALLOW_NEW/ALLOW/DROP/BLOCK} |   |                                   |
  * |    |     REGBIT_ACL_{LABEL/STATELESS}             | X |                                   |
- * +----+----------------------------------------------+ X |                                   |
- * | R5 |                   UNUSED                     | X |       LB_L2_AFF_BACKEND_IP6       |
- * | R1 |         ORIG_DIP_IPV4 (>= IN_PRE_STATEFUL)   | R |                                   |
- * +----+----------------------------------------------+ E |                                   |
+ * +----+----------------------------------------------+ X |       LB_L2_AFF_BACKEND_IP6       |
+ * | R1 |         ORIG_DIP_IPV4 (>= IN_PRE_STATEFUL)   | R |        (>= IN_LB_AFF_CHECK &&     |
+ * +----+----------------------------------------------+ E |         <= IN_LB_AFF_LEARN)       |
  * | R2 |         ORIG_TP_DPORT (>= IN_PRE_STATEFUL)   | G |                                   |
  * +----+----------------------------------------------+ 0 |                                   |
- * | R3 |                  ACL LABEL                   |   |                                   |
+ * | R3 |             OBS_POINT_ID_NEW                 |   |                                   |
+ * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |   |                                   |
  * +----+----------------------------------------------+---+-----------------------------------+
  * | R4 |            REG_LB_AFF_BACKEND_IP4            |   |                                   |
  * +----+----------------------------------------------+ X |                                   |
@@ -225,9 +228,11 @@  static bool vxlan_mode;
  * +----+----------------------------------------------+ G |                                   |
  * | R7 |                   UNUSED                     | 1 |                                   |
  * +----+----------------------------------------------+---+-----------------------------------+
- * | R8 |              LB_AFF_MATCH_PORT               |
+ * |    |              LB_AFF_MATCH_PORT               |
+ * |    |  (>= IN_LB_AFF_CHECK && <= IN_LB_AFF_LEARN)  |
  * +----+----------------------------------------------+
- * | R9 |                   UNUSED                     |
+ * | R9 |              OBS_POINT_ID_EST                |
+ * |    |       (>= ACL_EVAL* && <= ACL_ACTION*)       |
  * +----+----------------------------------------------+
  *
  * Logical Router pipeline:
@@ -6461,6 +6466,409 @@  build_acl_log(struct ds *actions, const struct nbrec_acl *acl,
     ds_put_cstr(actions, "); ");
 }
 
+/* This builds an ACL specific sample action.
+ * If the ACL has a label configured the label itself is used as sample
+ * observation point ID.  Otherwise the configured 'sample->metadata'
+ * is passed as observation point ID. */
+static void
+build_acl_sample_action(struct ds *actions, const struct nbrec_acl *acl,
+                        const struct nbrec_sample *sample,
+                        uint8_t sample_domain_id)
+{
+    if (!sample || sample_domain_id == SAMPLING_APP_ID_NONE) {
+        return;
+    }
+
+    uint32_t domain_id = 0;
+    uint32_t point_id = 0;
+
+    if (acl->label) {
+        domain_id = 0;
+        point_id = acl->label;
+    } else if (sample) {
+        domain_id = sample_domain_id;
+        point_id = sample->metadata;
+    }
+
+    for (size_t i = 0; i < sample->n_collectors; i++) {
+        ds_put_format(actions, "sample(probability=%"PRIu16","
+                               "collector_set=%hd,"
+                               "obs_domain=%"PRIu32","
+                               "obs_point=%"PRIu32");",
+                               (uint16_t) sample->collectors[i]->probability,
+                               (uint32_t) sample->collectors[i]->set_id,
+                               domain_id, point_id);
+    }
+}
+
+/* This builds an ACL logical flow specific action that stores the observation
+ * point IDs to be used for samples generated for traffic that hits the ACL.
+ * Two observation point IDs are stored in registers, the one for traffic
+ * that creates new connections and the one for traffic that's part of an
+ * existing connection.
+ */
+static void
+build_acl_sample_label_action(struct ds *actions, const struct nbrec_acl *acl,
+                              const struct nbrec_sample *sample_new,
+                              const struct nbrec_sample *sample_est)
+{
+    if (!acl->label && !sample_new && !sample_est) {
+        return;
+    }
+
+    uint32_t point_id_new = 0;
+    uint32_t point_id_est = 0;
+
+    if (acl->label) {
+        point_id_new = acl->label;
+        point_id_est = acl->label;
+    } else {
+        if (sample_new) {
+            point_id_new = sample_new->metadata;
+        }
+        if (sample_est) {
+            point_id_est = sample_est->metadata;
+        }
+    }
+
+    ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
+                           REG_OBS_POINT_ID_NEW " = %"PRIu32"; "
+                           REG_OBS_POINT_ID_EST " = %"PRIu32"; ",
+                  point_id_new, point_id_est);
+}
+
+/* This builds an ACL logical flow specific match that selects traffic
+ * with an associated observation point ID register equal to that of the
+ * ACL label (if configured) or sample->metadata.
+ */
+static void
+build_acl_sample_register_match(struct ds *match, const struct nbrec_acl *acl,
+                                const struct nbrec_sample *sample)
+{
+    uint32_t point_id = 0;
+
+    if (acl->label) {
+        point_id = acl->label;
+    } else if (sample) {
+        point_id = sample->metadata;
+    }
+
+    ds_put_format(match, REG_OBS_POINT_ID_NEW " == %"PRIu32, point_id);
+}
+
+/* This builds an ACL logical flow specific match that selects conntracked
+ * traffic whose associated ct_label.obs_point ID is equal to that of the
+ * ACL label (if configured) or sample->metadata.  The match also ensures
+ * that the observation domain ID stored in the ct_label is also equal to
+ * 'sample_domain_id'.
+ */
+static void
+build_acl_sample_label_match(struct ds *match, const struct nbrec_acl *acl,
+                             const struct nbrec_sample *sample,
+                             uint8_t sample_domain_id)
+{
+    uint32_t domain_id = 0;
+    uint32_t point_id = 0;
+
+    if (acl->label) {
+        domain_id = 0;
+        point_id = acl->label;
+    } else if (sample) {
+        domain_id = sample_domain_id;
+        point_id = sample->metadata;
+    }
+
+    /* Match on the complete ct_label to avoid masked access to it in the
+     * datapath.  Some NICs do not support HW offloading when masked-access
+     * of ct_label is used in the datapath. */
+    ds_put_format(match, "ct_label.obs_domain_id == %"PRIu32" && "
+                         "ct_label.obs_point_id == %"PRIu32" && "
+                         "ct_label.obs_unused == 0",
+                  domain_id, point_id);
+}
+
+/* This builds a logical flow that samples and forwards/drops traffic
+ * that hit a stateless ACL ("pass" or "allow-stateless") that has sampling
+ * enabled.
+ */
+static void
+build_acl_sample_new_stateless_flows(const struct ovn_datapath *od,
+                                     struct lflow_table *lflows,
+                                     enum ovn_stage stage,
+                                     struct ds *match, struct ds *actions,
+                                     const struct nbrec_acl *acl,
+                                     const char *reg_verdict,
+                                     const char *verdict_actions,
+                                     const char *copp_meter,
+                                     uint8_t sample_domain_id,
+                                     struct lflow_ref *lflow_ref)
+{
+    if (!acl->sample_new) {
+        return;
+    }
+
+    ds_clear(actions);
+    ds_clear(match);
+
+    ds_put_format(match, "ip && %s == 1 && ", reg_verdict);
+    build_acl_sample_register_match(match, acl, acl->sample_new);
+
+    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
+    ds_put_format(actions, " %s", verdict_actions);
+
+    ovn_lflow_metered(lflows, od, stage, 1100,
+                      ds_cstr(match), ds_cstr(actions),
+                      copp_meter, lflow_ref);
+}
+
+/* This builds a logical flow that samples and forwards/drops traffic
+ * that created a new conntrack entry and hit a stateful ACL that has sampling
+ * enabled.
+ */
+static void
+build_acl_sample_new_stateful_flows(const struct ovn_datapath *od,
+                                    struct lflow_table *lflows,
+                                    enum ovn_stage stage,
+                                    struct ds *match, struct ds *actions,
+                                    const struct nbrec_acl *acl,
+                                    const char *reg_verdict,
+                                    const char *verdict_actions,
+                                    const char *copp_meter,
+                                    uint8_t sample_domain_id,
+                                    struct lflow_ref *lflow_ref)
+{
+    if (!acl->sample_new) {
+        return;
+    }
+
+    ds_clear(actions);
+    ds_clear(match);
+
+    ds_put_format(match, "ip && ct.new && %s == 1 && ", reg_verdict);
+    build_acl_sample_register_match(match, acl, acl->sample_new);
+
+    build_acl_sample_action(actions, acl, acl->sample_new, sample_domain_id);
+    ds_put_format(actions, " %s", verdict_actions);
+
+    ovn_lflow_metered(lflows, od, stage, 1100,
+                      ds_cstr(match), ds_cstr(actions),
+                      copp_meter, lflow_ref);
+}
+
+/* This builds a logical flow that samples and forwards traffic
+ * that is part of an existing connection (in the original direction) created
+ * by traffic allowed by a stateful ACL that has sampling enabled.
+ */
+static void
+build_acl_sample_est_orig_stateful_flows(const struct ovn_datapath *od,
+                                         struct lflow_table *lflows,
+                                         enum ovn_stage stage,
+                                         struct ds *match, struct ds *actions,
+                                         const struct nbrec_acl *acl,
+                                         const char *reg_verdict,
+                                         const char *verdict_actions,
+                                         const char *copp_meter,
+                                         uint8_t sample_domain_id,
+                                         struct lflow_ref *lflow_ref)
+{
+    ds_clear(actions);
+    ds_clear(match);
+
+    ds_put_format(match, "ip && ct.trk && "
+                         "(ct.est || ct.rel) && "
+                         "!ct.rpl && %s == 1 && ",
+                  reg_verdict);
+    build_acl_sample_label_match(match, acl, acl->sample_est,
+                                 sample_domain_id);
+
+    build_acl_sample_action(actions, acl, acl->sample_est, sample_domain_id);
+    ds_put_format(actions, " %s", verdict_actions);
+
+    ovn_lflow_metered(lflows, od, stage, 1200,
+                      ds_cstr(match), ds_cstr(actions),
+                      copp_meter, lflow_ref);
+}
+
+/* This builds a logical flow that samples and forwards traffic
+ * that is part of an existing connection (in the reply direction) created
+ * by traffic allowed by a stateful ACL that has sampling enabled.
+ *
+ * NOTE: unlike for traffic in the original direction, this logical flow must
+ * be installed in the "opposite" pipeline.  That is, for "from-lport" ACLs
+ * the conntrack entry is created in the ingress logical port zone and will be
+ * hit by reply traffic in the egress pipeline (before being sent out that
+ * logical port).
+ */
+static void
+build_acl_sample_est_rpl_stateful_flows(const struct ovn_datapath *od,
+                                        struct lflow_table *lflows,
+                                        enum ovn_stage rpl_stage,
+                                        struct ds *match, struct ds *actions,
+                                        const struct nbrec_acl *acl,
+                                        const char *reg_verdict,
+                                        const char *verdict_actions,
+                                        const char *copp_meter,
+                                        uint8_t sample_domain_id,
+                                        struct lflow_ref *lflow_ref)
+{
+    ds_clear(actions);
+    ds_clear(match);
+
+    ds_put_format(match, "ip && ct.trk && "
+                         "(ct.est || ct.rel) && "
+                         "ct.rpl && %s == 1 && ",
+                  reg_verdict);
+    build_acl_sample_label_match(match, acl, acl->sample_est,
+                                 sample_domain_id);
+
+    build_acl_sample_action(actions, acl, acl->sample_est, sample_domain_id);
+    ds_put_format(actions, " %s", verdict_actions);
+
+    ovn_lflow_metered(lflows, od, rpl_stage, 1200,
+                      ds_cstr(match), ds_cstr(actions),
+                      copp_meter, lflow_ref);
+}
+
+/* This builds logical flows that sample and forward traffic
+ * that is part of an existing connection (both in the original and in the
+ * reply direction) created by traffic allowed by a stateful ACL that has
+ * sampling enabled.
+ */
+static void
+build_acl_sample_est_stateful_flows(const struct ovn_datapath *od,
+                                    struct lflow_table *lflows,
+                                    enum ovn_stage stage,
+                                    struct ds *match, struct ds *actions,
+                                    const struct nbrec_acl *acl,
+                                    const char *reg_verdict,
+                                    const char *verdict_actions,
+                                    const char *copp_meter,
+                                    uint8_t sample_domain_id,
+                                    struct lflow_ref *lflow_ref)
+{
+    if (!acl->sample_est) {
+        return;
+    }
+    build_acl_sample_est_orig_stateful_flows(od, lflows, stage, match, actions,
+                                             acl, reg_verdict, verdict_actions,
+                                             copp_meter, sample_domain_id,
+                                             lflow_ref);
+
+    /* Install flows in the "opposite" pipeline direction to handle reply
+     * traffic on established connections. */
+    enum ovn_stage rpl_stage = (stage == S_SWITCH_OUT_ACL_ACTION
+                                ? S_SWITCH_IN_ACL_ACTION
+                                : S_SWITCH_OUT_ACL_ACTION);
+    build_acl_sample_est_rpl_stateful_flows(od, lflows, rpl_stage,
+                                            match, actions,
+                                            acl, reg_verdict, verdict_actions,
+                                            copp_meter, sample_domain_id,
+                                            lflow_ref);
+}
+
+static void build_acl_reject_action(struct ds *actions, bool is_ingress);
+
+/* This builds all ACL sampling related logical flows:
+ * - for packets creating new connections
+ * - for packets that are part of an existing connection
+ */
+static void
+build_acl_sample_flows(const struct ls_stateful_record *ls_stateful_rec,
+                       const struct ovn_datapath *od,
+                       struct lflow_table *lflows,
+                       const struct nbrec_acl *acl,
+                       const struct shash *meter_groups,
+                       struct ds *match, struct ds *actions,
+                       const struct sampling_app_table *sampling_apps,
+                       struct lflow_ref *lflow_ref)
+{
+    bool should_sample_established =
+        ls_stateful_rec->has_stateful_acl
+        && acl->sample_est
+        && !strcmp(acl->action, "allow-related");
+
+    bool stateful_match =
+        ls_stateful_rec->has_stateful_acl
+        && strcmp(acl->action, "allow-stateless");
+
+    /* Only sample if:
+     * - sampling is enabled for traffic creating new connections
+     * OR
+     * - sampling is enabled for traffic on established sessions and the
+     *   switch has stateful ACLs.
+     */
+    if (!acl->sample_new && !should_sample_established) {
+        return;
+    }
+
+    bool ingress = !strcmp(acl->direction, "from-lport") ? true : false;
+    enum ovn_stage stage;
+
+    if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
+        stage = S_SWITCH_IN_ACL_AFTER_LB_ACTION;
+    } else if (ingress) {
+        stage = S_SWITCH_IN_ACL_ACTION;
+    } else {
+        stage = S_SWITCH_OUT_ACL_ACTION;
+    }
+
+    struct ds verdict_actions = DS_EMPTY_INITIALIZER;
+    ds_put_cstr(&verdict_actions, REGBIT_ACL_VERDICT_ALLOW " = 0; "
+                                  REGBIT_ACL_VERDICT_DROP " = 0; "
+                                  REGBIT_ACL_VERDICT_REJECT " = 0; ");
+    if (ls_stateful_rec->max_acl_tier) {
+        ds_put_cstr(&verdict_actions, REG_ACL_TIER " = 0; ");
+    }
+
+    /* Follow the same verdict determination logic as in
+     * build_acl_action_lflows().
+     */
+    const char *reg_verdict = REGBIT_ACL_VERDICT_ALLOW;
+    const char *copp_meter = NULL;
+    if (!strcmp(acl->action, "drop")) {
+        reg_verdict = REGBIT_ACL_VERDICT_DROP;
+        ds_put_cstr(&verdict_actions, debug_implicit_drop_action());
+    } else if (!strcmp(acl->action, "reject")) {
+        reg_verdict = REGBIT_ACL_VERDICT_REJECT;
+        copp_meter = copp_meter_get(COPP_REJECT, od->nbs->copp, meter_groups);
+        build_acl_reject_action(&verdict_actions, ingress);
+    } else if (!strcmp(acl->action, "allow")
+               || !strcmp(acl->action, "allow-related")) {
+        reg_verdict = REGBIT_ACL_VERDICT_ALLOW;
+        ds_put_cstr(&verdict_actions, "next;");
+    } else {
+        /* XXX: ACLs with action "pass" do not support sampling. */
+        goto done;
+    }
+
+    uint8_t sample_new_domain_id =
+        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_NEW_TRAFFIC);
+    uint8_t sample_est_domain_id =
+        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_EST_TRAFFIC);
+
+    if (!stateful_match) {
+        build_acl_sample_new_stateless_flows(od, lflows, stage, match, actions,
+                                             acl, reg_verdict,
+                                             ds_cstr_ro(&verdict_actions),
+                                             copp_meter, sample_new_domain_id,
+                                             lflow_ref);
+    } else {
+        build_acl_sample_new_stateful_flows(od, lflows, stage, match, actions,
+                                            acl, reg_verdict,
+                                            ds_cstr_ro(&verdict_actions),
+                                            copp_meter, sample_new_domain_id,
+                                            lflow_ref);
+        build_acl_sample_est_stateful_flows(od, lflows, stage, match, actions,
+                                            acl, reg_verdict,
+                                            ds_cstr_ro(&verdict_actions),
+                                            copp_meter, sample_est_domain_id,
+                                            lflow_ref);
+    }
+
+done:
+    ds_destroy(&verdict_actions);
+}
+
 static void
 consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
              const struct nbrec_acl *acl, bool has_stateful,
@@ -6508,6 +6916,10 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
     if (!has_stateful
         || !strcmp(acl->action, "pass")
         || !strcmp(acl->action, "allow-stateless")) {
+
+        /* For stateless ACLs just sample "new" packets. */
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
+
         ds_put_cstr(actions, "next;");
         ds_put_format(match, "(%s)", acl->match);
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
@@ -6542,10 +6954,10 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
 
         ds_truncate(actions, log_verdict_len);
         ds_put_cstr(actions, REGBIT_CONNTRACK_COMMIT" = 1; ");
-        if (acl->label) {
-            ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
-                          REG_LABEL" = %"PRId64"; ", acl->label);
-        }
+
+        /* For stateful ACLs sample "new" and "established" packets. */
+        build_acl_sample_label_action(actions, acl, acl->sample_new,
+                                      acl->sample_est);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6565,9 +6977,11 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
                       acl->match);
         if (acl->label) {
             ds_put_cstr(actions, REGBIT_CONNTRACK_COMMIT" = 1; ");
-            ds_put_format(actions, REGBIT_ACL_LABEL" = 1; "
-                          REG_LABEL" = %"PRId64"; ", acl->label);
         }
+
+        /* For stateful ACLs sample "new" and "established" packets. */
+        build_acl_sample_label_action(actions, acl, acl->sample_new,
+                                      acl->sample_est);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6585,6 +6999,9 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
         ds_put_format(match, " && (%s)", acl->match);
 
         ds_truncate(actions, log_verdict_len);
+
+        /* For drop ACLs just sample all packets as "new" packets. */
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
         ds_put_cstr(actions, "next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6605,6 +7022,9 @@  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
         ds_put_format(match, " && (%s)", acl->match);
 
         ds_truncate(actions, log_verdict_len);
+
+        /* For drop ACLs just sample all packets as "new" packets. */
+        build_acl_sample_label_action(actions, acl, acl->sample_new, NULL);
         ds_put_cstr(actions, "ct_commit { ct_mark.blocked = 1; }; next;");
         ovn_lflow_add_with_hint(lflows, od, stage, priority,
                                 ds_cstr(match), ds_cstr(actions),
@@ -6685,6 +7105,20 @@  ovn_update_ipv6_options(struct hmap *lr_ports)
 
 #define IPV6_CT_OMIT_MATCH "nd || nd_ra || nd_rs || mldv1 || mldv2"
 
+static void
+build_acl_reject_action(struct ds *actions, bool is_ingress)
+{
+    ds_put_format(
+        actions, "reg0 = 0; "
+        "reject { "
+          "/* eth.dst <-> eth.src; ip.dst <-> ip.src; is implicit. */ "
+          "outport <-> inport; next(pipeline=%s,table=%d); "
+        "};",
+        is_ingress ? "egress" : "ingress",
+        is_ingress ? ovn_stage_get_table(S_SWITCH_OUT_QOS)
+            : ovn_stage_get_table(S_SWITCH_IN_L2_LKUP));
+}
+
 static void
 build_acl_action_lflows(const struct ls_stateful_record *ls_stateful_rec,
                         const struct ovn_datapath *od,
@@ -6731,14 +7165,7 @@  build_acl_action_lflows(const struct ls_stateful_record *ls_stateful_rec,
         bool ingress = ovn_stage_get_pipeline(stage) == P_IN;
 
         ds_truncate(actions, verdict_len);
-        ds_put_format(
-            actions, "reg0 = 0; "
-            "reject { "
-            "/* eth.dst <-> eth.src; ip.dst <-> ip.src; is implicit. */ "
-            "outport <-> inport; next(pipeline=%s,table=%d); };",
-            ingress ? "egress" : "ingress",
-            ingress ? ovn_stage_get_table(S_SWITCH_OUT_QOS)
-                : ovn_stage_get_table(S_SWITCH_IN_L2_LKUP));
+        build_acl_reject_action(actions, ingress);
 
         ovn_lflow_metered(lflows, od, stage, 1000,
                           REGBIT_ACL_VERDICT_REJECT " == 1", ds_cstr(actions),
@@ -6778,12 +7205,6 @@  build_acl_log_related_flows(const struct ovn_datapath *od,
      * the ACL, then we need to ensure that the related and reply
      * traffic is logged, so we install a slightly higher-priority
      * flow that matches the ACL, allows the traffic, and logs it.
-     *
-     * Note: Matching the ct_label.label may prevent OVS flow HW
-     * offloading to work for some NICs because masked-access of
-     * ct_label is not supported on those NICs due to HW
-     * limitations. In such case the user may choose to avoid using the
-     * "log-related" option.
      */
     bool ingress = !strcmp(acl->direction, "from-lport") ? true :false;
     bool log_related = smap_get_bool(&acl->options, "log-related",
@@ -6842,6 +7263,7 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
            struct lflow_table *lflows,
            const struct ls_port_group_table *ls_port_groups,
            const struct shash *meter_groups,
+           const struct sampling_app_table *sampling_apps,
            struct lflow_ref *lflow_ref)
 {
     const char *default_acl_action = default_acl_drop
@@ -7031,6 +7453,9 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
         consider_acl(lflows, od, acl, has_stateful,
                      meter_groups, ls_stateful_rec->max_acl_tier,
                      &match, &actions, lflow_ref);
+        build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
+                               meter_groups, &match, &actions,
+                               sampling_apps, lflow_ref);
     }
 
     const struct ls_port_group *ls_pg =
@@ -7047,6 +7472,9 @@  build_acls(const struct ls_stateful_record *ls_stateful_rec,
                 consider_acl(lflows, od, acl, has_stateful,
                              meter_groups, ls_stateful_rec->max_acl_tier,
                              &match, &actions, lflow_ref);
+                build_acl_sample_flows(ls_stateful_rec, od, lflows, acl,
+                                       meter_groups, &match, &actions,
+                                       sampling_apps, lflow_ref);
             }
         }
     }
@@ -7691,8 +8119,11 @@  build_lb_rules(struct lflow_table *lflows, struct ovn_lb_datapaths *lb_dps,
 
 static void
 build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
+               const struct sampling_app_table *sampling_apps,
                struct lflow_ref *lflow_ref)
 {
+    uint8_t sample_domain_est =
+        sampling_app_get_id(sampling_apps, SAMPLING_APP_ACL_EST_TRAFFIC);
     struct ds actions = DS_EMPTY_INITIALIZER;
 
     /* Ingress LB, Ingress and Egress stateful Table (Priority 0): Packets are
@@ -7709,8 +8140,13 @@  build_stateful(struct ovn_datapath *od, struct lflow_table *lflows,
      * We always set ct_mark.blocked to 0 here as
      * any packet that makes it this far is part of a connection we
      * want to allow to continue. */
-    ds_put_cstr(&actions, "ct_commit { ct_mark.blocked = 0; "
-                          "ct_label.label = " REG_LABEL "; }; next;");
+    ds_put_format(&actions,
+                 "ct_commit { "
+                    "ct_mark.blocked = 0; "
+                    "ct_label.obs_domain_id = %"PRIu8 "; "
+                    "ct_label.obs_point_id = " REG_OBS_POINT_ID_EST "; "
+                  "}; next;",
+                  sample_domain_est);
     ovn_lflow_add(lflows, od, S_SWITCH_IN_STATEFUL, 100,
                   REGBIT_CONNTRACK_COMMIT" == 1 && "
                   REGBIT_ACL_LABEL" == 1",
@@ -8718,6 +9154,7 @@  build_lswitch_lflows_pre_acl_and_acl(
     struct ovn_datapath *od,
     struct lflow_table *lflows,
     const struct shash *meter_groups,
+    const struct sampling_app_table *sampling_apps,
     struct lflow_ref *lflow_ref)
 {
     ovs_assert(od->nbs);
@@ -8725,7 +9162,7 @@  build_lswitch_lflows_pre_acl_and_acl(
     build_pre_lb(od, meter_groups, lflows, lflow_ref);
     build_pre_stateful(od, lflows, lflow_ref);
     build_qos(od, lflows, lflow_ref);
-    build_stateful(od, lflows, lflow_ref);
+    build_stateful(od, lflows, sampling_apps, lflow_ref);
     build_vtep_hairpin(od, lflows, lflow_ref);
 }
 
@@ -15755,6 +16192,7 @@  build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
                         const struct ovn_datapath *od,
                         const struct ls_port_group_table *ls_pgs,
                         const struct shash *meter_groups,
+                        const struct sampling_app_table *sampling_apps,
                         struct lflow_table *lflows)
 {
     build_ls_stateful_rec_pre_acls(ls_stateful_rec, od, ls_pgs, lflows,
@@ -15764,7 +16202,7 @@  build_ls_stateful_flows(const struct ls_stateful_record *ls_stateful_rec,
     build_acl_hints(ls_stateful_rec, od, lflows,
                     ls_stateful_rec->lflow_ref);
     build_acls(ls_stateful_rec, od, lflows, ls_pgs, meter_groups,
-               ls_stateful_rec->lflow_ref);
+               sampling_apps, ls_stateful_rec->lflow_ref);
     build_lb_hairpin(ls_stateful_rec, od, lflows, ls_stateful_rec->lflow_ref);
 }
 
@@ -15788,6 +16226,7 @@  struct lswitch_flow_build_info {
     struct ds actions;
     size_t thread_lflow_counter;
     const char *svc_monitor_mac;
+    const struct sampling_app_table *sampling_apps;
 };
 
 /* Helper function to combine all lflow generation which is iterated by
@@ -15802,7 +16241,8 @@  build_lswitch_and_lrouter_iterate_by_ls(struct ovn_datapath *od,
 {
     ovs_assert(od->nbs);
     build_lswitch_lflows_pre_acl_and_acl(od, lsi->lflows,
-                                         lsi->meter_groups, NULL);
+                                         lsi->meter_groups,
+                                         lsi->sampling_apps, NULL);
 
     build_fwd_group_lflows(od, lsi->lflows, NULL);
     build_lswitch_lflows_admission_control(od, lsi->lflows, NULL);
@@ -16079,6 +16519,7 @@  build_lflows_thread(void *arg)
                     build_ls_stateful_flows(ls_stateful_rec, od,
                                             lsi->ls_port_groups,
                                             lsi->meter_groups,
+                                            lsi->sampling_apps,
                                             lsi->lflows);
                 }
             }
@@ -16152,7 +16593,8 @@  build_lswitch_and_lrouter_flows(
     const struct hmap *svc_monitor_map,
     const struct hmap *bfd_connections,
     const struct chassis_features *features,
-    const char *svc_monitor_mac)
+    const char *svc_monitor_mac,
+    const struct sampling_app_table *sampling_apps)
 {
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
@@ -16186,6 +16628,7 @@  build_lswitch_and_lrouter_flows(
             lsiv[index].svc_check_match = svc_check_match;
             lsiv[index].thread_lflow_counter = 0;
             lsiv[index].svc_monitor_mac = svc_monitor_mac;
+            lsiv[index].sampling_apps = sampling_apps;
             ds_init(&lsiv[index].match);
             ds_init(&lsiv[index].actions);
 
@@ -16226,6 +16669,7 @@  build_lswitch_and_lrouter_flows(
             .features = features,
             .svc_check_match = svc_check_match,
             .svc_monitor_mac = svc_monitor_mac,
+            .sampling_apps = sampling_apps,
             .match = DS_EMPTY_INITIALIZER,
             .actions = DS_EMPTY_INITIALIZER,
         };
@@ -16298,6 +16742,7 @@  build_lswitch_and_lrouter_flows(
                                    &od->nbs->header_.uuid));
             build_ls_stateful_flows(ls_stateful_rec, od, lsi.ls_port_groups,
                                     lsi.meter_groups,
+                                    lsi.sampling_apps,
                                     lsi.lflows);
         }
         stopwatch_stop(LFLOWS_LS_STATEFUL_STOPWATCH_NAME, time_msec());
@@ -16387,7 +16832,8 @@  void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
                                     input_data->svc_monitor_map,
                                     input_data->bfd_connections,
                                     input_data->features,
-                                    input_data->svc_monitor_mac);
+                                    input_data->svc_monitor_mac,
+                                    input_data->sampling_apps);
 
     if (parallelization_state == STATE_INIT_HASH_SIZES) {
         parallelization_state = STATE_USE_PARALLELIZATION;
@@ -16811,6 +17257,7 @@  lflow_handle_ls_stateful_changes(struct ovsdb_idl_txn *ovnsb_txn,
         build_ls_stateful_flows(ls_stateful_rec, od,
                                 lflow_input->ls_port_groups,
                                 lflow_input->meter_groups,
+                                lflow_input->sampling_apps,
                                 lflows);
 
         /* Sync the new flows to SB. */
diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
index b06b09ac5f..ca97a9b703 100644
--- a/northd/ovn-northd.8.xml
+++ b/northd/ovn-northd.8.xml
@@ -899,6 +899,32 @@ 
         next(pipeline=egress,table=5);}</code> action for SCTP associations.
       </li>
 
+      <li>
+        For each ACL with sample_new configured a priority 1100 flow is
+        installed that additionally matches on the saved observation_point_id
+        value.  This flow also generates a <code>sample()</code> action along
+        with the usual action processing for the type of ACL.
+      </li>
+
+      <li>
+        For each ACL with sample_est configured a priority 1200 flow is
+        installed that additionally matches on the saved observation_point_id
+        value for established traffic in the original direction.  This flow
+        also generates a <code>sample()</code> action along with the usual
+        action processing for the type of ACL.
+      </li>
+
+      <li>
+        For each ACL with sample_est configured a priority 1200 flow is
+        installed that additionally matches on the saved observation_point_id
+        value for established traffic in the reply direction.  This flow also
+        generates a <code>sample()</code> action along with the usual action
+        processing for the type of ACL.  Note: this flow is installed in the
+        opposite pipeline (in the ingress pipeline for ACLs applied in the
+        egress direction and in the egress pipeline for ACLs applied in the
+        ingress direction).
+      </li>
+
       <li>
         If any ACLs have tiers configured on them, then three priority 500
         flows are installed. If the current tier counter is 0, 1, or 2, then
diff --git a/ovn-nb.ovsschema b/ovn-nb.ovsschema
index e131a4c083..6d26f468c7 100644
--- a/ovn-nb.ovsschema
+++ b/ovn-nb.ovsschema
@@ -1,7 +1,7 @@ 
 {
     "name": "OVN_Northbound",
-    "version": "7.4.0",
-    "cksum": "1498303893 36355",
+    "version": "7.5.0",
+    "cksum": "102122798 38439",
     "tables": {
         "NB_Global": {
             "columns": {
@@ -30,6 +30,40 @@ 
                 "ipsec": {"type": "boolean"}},
             "maxRows": 1,
             "isRoot": true},
+        "Sample_Collector": {
+            "columns": {
+                "name": {"type": "string"},
+                "probability": {"type": {"key": {
+                    "type": "integer",
+                    "minInteger": 0,
+                    "maxInteger": 65535}}},
+                "set_id": {"type": {"key": {
+                    "type": "integer",
+                    "minInteger": 0,
+                    "maxInteger": 4294967295}}},
+                "external_ids": {"type": {"key": "string", "value": "string",
+                                          "min": 0, "max": "unlimited"}}
+            },
+            "indexes": [["name"]],
+            "isRoot": true
+        },
+        "Sample": {
+            "columns": {
+                "collectors": {"type": {"key": {"type": "uuid",
+                                                "refTable": "Sample_Collector",
+                                                "refType": "strong"},
+                                        "min": 0,
+                                        "max": "unlimited"}},
+                "metadata": {"type": {"key": {"type": "integer",
+                                              "minInteger": 1,
+                                              "maxInteger": 4294967295},
+                                      "min": 1, "max":1}},
+                "external_ids": {"type": {"key": "string", "value": "string",
+                                          "min": 0, "max": "unlimited"}}
+            },
+            "indexes": [["metadata"]],
+            "isRoot": true
+        },
         "Copp": {
             "columns": {
                 "name": {"type": "string"},
@@ -275,6 +309,14 @@ 
                 "tier": {"type": {"key": {"type": "integer",
                                           "minInteger": 0,
                                           "maxInteger": 3}}},
+                "sample_new": {"type": {"key": {"type": "uuid",
+                                                "refTable": "Sample",
+                                                "refType": "strong"},
+                                        "min": 0, "max": 1}},
+                "sample_est": {"type": {"key": {"type": "uuid",
+                                                "refTable": "Sample",
+                                                "refType": "strong"},
+                                        "min": 0, "max": 1}},
                 "options": {
                      "type": {"key": "string",
                               "value": "string",
diff --git a/ovn-nb.xml b/ovn-nb.xml
index f2a8b5c076..480144ddc1 100644
--- a/ovn-nb.xml
+++ b/ovn-nb.xml
@@ -490,6 +490,48 @@ 
 
   </table>
 
+  <table name="Sample_Collector" title="Sample_Collector">
+    <column name="name">
+      Sample collector name.
+    </column>
+    <column name="probability">
+      Sampling probability for this collector.  It must be an integer number
+      between 0 and 65535.  A value of 0 corresponds to no packets being
+      sampled while a value of 65535 corresponds to all packets being sampled.
+    </column>
+    <column name="set_id">
+      The 32-bit integer identifier of the set of of collectors to send
+      packets to. See Flow_Sample_Collector_Set Table in ovs-vswitchd's
+      database schema.
+    </column>
+    <column name="external_ids">
+      See <em>External IDs</em> at the beginning of this document.
+    </column>
+  </table>
+
+  <table name="Sample" title="Sample">
+    <p>
+      This table describes a Sampling configuration. Entries in other tables
+      might be associated with Sample entries to indicate how the sample
+      should be generated.
+
+      For an example, see <ref table="ACL"/>.
+    </p>
+    <column name="collectors">
+      A list of references to <ref table="Sample_Collector"/> records to be
+      used when generating samples (e.g., IPFIX).  A sample can be sent to
+      multiple collectors simultaneously.
+    </column>
+    <column name="metadata">
+      Will be used as Observation Point ID in every sample.  The Observation
+      Domain ID will be generated by ovn-northd and includes the logical
+      datapath key as the least significant 24 bits and the sampling
+      application type (e.g., drop debugging) as the 8 most significant bits.
+    </column>
+    <column name="external_ids">
+      See <em>External IDs</em> at the beginning of this document.
+    </column>
+  </table>
   <table name="Copp" title="Control plane protection">
     <p>
       This table is used to define control plane protection policies, i.e.,
@@ -2500,6 +2542,27 @@  or
       </column>
     </group>
 
+    <column name="sample_new">
+      <p>
+        The entry in the <ref table="Sample"/> table to use for sampling for
+        new sessions matched by this ACL.  In case the ACL is stateless
+        this is used for sampling all traffic matched by the ACL.
+
+        NOTE: ACLs with action <code>pass</code> currently do not support
+        sampling of traffic on new sessions.
+      </p>
+    </column>
+
+    <column name="sample_est">
+      <p>
+        The entry in the <ref table="Sample"/> table to use for sampling for
+        established/related sessions matched by this ACL.
+
+        NOTE: ACLs with action <code>pass</code> currently do not support
+        sampling of traffic on established sessions.
+      </p>
+    </column>
+
     <group title="Common Columns">
       <column name="options">
         This column provides general key/value settings. The supported
diff --git a/tests/atlocal.in b/tests/atlocal.in
index 32d1c374ea..29e1bb2982 100644
--- a/tests/atlocal.in
+++ b/tests/atlocal.in
@@ -196,6 +196,12 @@  find_command bfdd-beacon
 # Set HAVE_ARPING
 find_command arping
 
+# Set HAVE_NFCAPD
+find_command nfcapd
+
+# Set HAVE_NFDUMP
+find_command nfdump
+
 # Turn off proxies.
 unset http_proxy
 unset https_proxy
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index 47ada5c70e..863d85daf4 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -1049,6 +1049,10 @@  ovn_strip_lflows() {
      sed 's/table=[[0-9]]\{1,2\}\s\?/table=??/g' | sort
 }
 
+ovn_strip_collector_set() {
+    sed 's/collector_set=[[0-9]]*,\?/collector_set=??,/g'
+}
+
 OVS_END_SHELL_HELPERS
 
 m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
diff --git a/tests/ovn-nbctl.at b/tests/ovn-nbctl.at
index 31de309215..0905a021eb 100644
--- a/tests/ovn-nbctl.at
+++ b/tests/ovn-nbctl.at
@@ -2802,6 +2802,26 @@  check_row_count nb:ACL 0
 
 dnl ---------------------------------------------------------------------
 
+OVN_NBCTL_TEST([acl_sampling], [ACL sampling operations], [
+check ovn-nbctl ls-add ls
+sample1=$(ovn-nbctl create sample metadata=4301)
+sample2=$(ovn-nbctl create sample metadata=4302)
+check_row_count nb:Sample 2
+
+check ovn-nbctl --sample-new=$sample1 acl-add ls from-lport 1 1 allow-related
+check_column "$sample1" nb:ACL sample_new priority=1
+
+check ovn-nbctl --sample-est=$sample2 acl-add ls from-lport 2 1 allow-related
+check_column "" nb:ACL sample_new priority=2
+check_column "$sample2" nb:ACL sample_est priority=2
+
+check ovn-nbctl --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 3 1 allow-related
+check_column "$sample1" nb:ACL sample_new priority=3
+check_column "$sample2" nb:ACL sample_est priority=3
+])
+
+dnl ---------------------------------------------------------------------
+
 AT_SETUP([ovn-nbctl - daemon retry connection])
 OVN_NBCTL_TEST_START daemon
 pid=$(cat ovsdb-server.pid)
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 3fb883225a..a6843ce998 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -4604,7 +4604,7 @@  check_stateful_flows() {
     AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
     AT_CHECK_UNQUOTED([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
@@ -4628,7 +4628,7 @@  check_stateful_flows() {
     AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 }
 
@@ -4670,7 +4670,7 @@  AT_CHECK([grep "ls_in_lb " sw0flows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep "ls_out_pre_lb" sw0flows | ovn_strip_lflows], [0], [dnl
@@ -4691,7 +4691,7 @@  AT_CHECK([grep "ls_out_pre_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # LB with event=false and reject=false
@@ -4720,23 +4720,23 @@  ovn-sbctl dump-flows sw0 > sw0flows
 AT_CAPTURE_FILE([sw0flows])
 
 AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
 ])
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
 ])
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # Add new ACL without label
@@ -4747,27 +4747,27 @@  ovn-sbctl dump-flows sw0 > sw0flows
 AT_CAPTURE_FILE([sw0flows])
 
 AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
   table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
-  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
   table=??(ls_in_acl_eval     ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
 ])
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
   table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[7]] == 1 && (udp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; next;)
-  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; next;)
+  table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (tcp)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 1234; reg9 = 1234; next;)
   table=??(ls_out_acl_eval    ), priority=2002 , match=(reg0[[8]] == 1 && (udp)), action=(reg8[[16]] = 1; next;)
 ])
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 # Delete new ACL with label
@@ -4784,7 +4784,7 @@  AT_CHECK([grep -w "ls_in_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0]
 AT_CHECK([grep "ls_in_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0], [dnl
@@ -4794,7 +4794,7 @@  AT_CHECK([grep -w "ls_out_acl_eval" sw0flows | grep 2002 | ovn_strip_lflows], [0
 AT_CHECK([grep "ls_out_stateful" sw0flows | ovn_strip_lflows], [0], [dnl
   table=??(ls_out_stateful    ), priority=0    , match=(1), action=(next;)
   table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_out_stateful    ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 AT_CLEANUP
 ])
@@ -4822,7 +4822,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls from-lport 1
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
 ])
 
 AS_BOX([from-lport --apply-after-lb allow-related ACL])
@@ -4830,7 +4830,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --apply-after-lb --label=1234 acl-add
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_in_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
 ])
 
 AS_BOX([to-lport allow-related ACL])
@@ -4838,7 +4838,7 @@  check ovn-nbctl --wait=sb -- acl-del ls -- --label=1234 acl-add ls to-lport 1 ip
 
 dnl Check that the label is committed to conntrack in the ingress pipeline
 AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new --ct new ls "$flow" | grep -e ls_out_stateful -A 2 | grep commit], [0], [dnl
-    ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; };
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; };
 ])
 
 AT_CLEANUP
@@ -7658,7 +7658,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AS_BOX([Remove and add the ACLs back with the apply-after-lb option])
@@ -7713,7 +7713,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AS_BOX([Remove and add the ACLs back with a few ACLs with apply-after-lb option])
@@ -7768,7 +7768,7 @@  AT_CHECK([grep -e "ls_in_lb " lsflows | ovn_strip_lflows], [0], [dnl
 AT_CHECK([grep -e "ls_in_stateful" lsflows | ovn_strip_lflows], [0], [dnl
   table=??(ls_in_stateful     ), priority=0    , match=(1), action=(next;)
   table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 0), action=(ct_commit { ct_mark.blocked = 0; }; next;)
-  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.label = reg3; }; next;)
+  table=??(ls_in_stateful     ), priority=100  , match=(reg0[[1]] == 1 && reg0[[13]] == 1), action=(ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 0; ct_label.obs_point_id = reg9; }; next;)
 ])
 
 AT_CLEANUP
@@ -12491,6 +12491,198 @@  AT_CHECK([ovn-sbctl lflow-list | grep ls_in_l2_unknown.*sample | ovn_strip_lflow
 AT_CLEANUP
 ])
 
+OVN_FOR_EACH_NORTHD_NO_HV([
+AT_SETUP([ACL Sampling])
+AT_KEYWORDS([acl])
+
+ovn_start
+
+collector1=$(ovn-nbctl create Sample_Collector name=test-collector1 probability=65535 set_id=1)
+collector2=$(ovn-nbctl create Sample_Collector name=test-collector2 probability=65535 set_id=2)
+check_row_count nb:Sample_Collector 2
+
+ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" id="42"
+ovn-nbctl create Sampling_App name="acl-est-traffic-sampling" id="43"
+check_row_count nb:Sampling_App 2
+
+sample1=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=4301)
+sample2=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=4302)
+check_row_count nb:Sample 2
+
+check ovn-nbctl                               \
+  -- ls-add ls                                \
+  -- lsp-add ls lsp1                          \
+  -- lsp-set-addresses lsp1 00:00:00:00:00:01 \
+  -- lsp-add ls lsp2                          \
+  -- lsp-set-addresses lsp2 00:00:00:00:00:02
+check ovn-nbctl --wait=sb sync
+
+base_flow="inport == \"lsp1\" && eth.src == 00:00:00:00:00:01 && eth.dst == 00:00:00:00:00:02 && ip4.src == 42.42.42.1 && ip4.dst == 42.42.42.2"
+
+AS_BOX([from-lport ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_action -e ls_in_acl_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_action   ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_action   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
+    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
+])
+
+AS_BOX([from-lport ACL sampling (new)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --sample-new=$sample1 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_action -e ls_in_acl_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_action   ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_in_acl_eval     ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    reg9 = 0;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 0;
+])
+
+AS_BOX([from-lport-after-lb ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --apply-after-lb --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_action -e ls_in_acl_after_lb_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_after_lb_action), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_after_lb_action), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
+    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
+])
+
+AS_BOX([from-lport-after-lb ACL sampling (new)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --apply-after-lb --sample-new=$sample1 acl-add ls from-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_in_acl_after_lb_action -e ls_in_acl_after_lb_eval -e ls_out_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_after_lb_action), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_in_acl_after_lb_eval), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    reg9 = 0;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 0;
+])
+
+AS_BOX([to-lport ACL sampling (new, est)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --sample-new=$sample1 --sample-est=$sample2 acl-add ls to-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_action -e ls_out_acl_eval -e ls_in_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_in_acl_action   ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_out_acl_action  ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_out_acl_action  ), priority=1200 , match=(ip && ct.trk && (ct.est || ct.rel) && !ct.rpl && reg8[[16]] == 1 && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302 && ct_label.obs_unused == 0), action=(sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302);sample(probability=65535,collector_set=??,obs_domain=43,obs_point=4302); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 4302; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; };
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace estasblished connections.
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 4302"
+AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 4302;
+    sample(probability=65535,collector_set=1,obs_domain=43,obs_point=4302);
+    sample(probability=65535,collector_set=2,obs_domain=43,obs_point=4302);
+])
+
+AS_BOX([to-lport ACL sampling (new)])
+check ovn-nbctl acl-del ls
+check ovn-nbctl --wait=sb --sample-new=$sample1 acl-add ls to-lport 1 "1" allow-related
+AT_CHECK([ovn-sbctl lflow-list | grep -e ls_out_acl_action -e ls_out_acl_eval -e ls_in_acl_action | ovn_strip_lflows | ovn_strip_collector_set | grep -e reg3 -e reg9 -e sample], [0], [dnl
+  table=??(ls_out_acl_action  ), priority=1100 , match=(ip && ct.new && reg8[[16]] == 1 && reg3 == 4301), action=(sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301);sample(probability=65535,collector_set=??,obs_domain=42,obs_point=4301); reg8[[16]] = 0; reg8[[17]] = 0; reg8[[18]] = 0; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[7]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[1]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+  table=??(ls_out_acl_eval    ), priority=1001 , match=(reg0[[8]] == 1 && (1)), action=(reg8[[16]] = 1; reg0[[13]] = 1; reg3 = 4301; reg9 = 0; next;)
+])
+
+dnl Trace new connections.
+flow="$base_flow"
+AT_CHECK_UNQUOTED([ovn_trace --ct new --ct new ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    ct_commit { ct_mark.blocked = 0; ct_label.obs_domain_id = 43; ct_label.obs_point_id = reg9; };
+    ct_commit { ct_mark.blocked = 0; };
+    reg9 = 0;
+    sample(probability=65535,collector_set=1,obs_domain=42,obs_point=4301);
+    sample(probability=65535,collector_set=2,obs_domain=42,obs_point=4301);
+])
+
+dnl Trace established connections (no point id was committed in the label in
+dnl the original direction).
+flow="$base_flow && ct_label.obs_domain_id == 43 && ct_label.obs_point_id == 0"
+AT_CHECK_UNQUOTED([ovn_trace --ct est --ct est ls "$flow" | grep -e sample -e commit -e reg9 | sort], [0], [dnl
+    reg9 = 0;
+])
+
+AT_CLEANUP
+])
+
 OVN_FOR_EACH_NORTHD_NO_HV([
 AT_SETUP([NAT with match])
 ovn_start
diff --git a/tests/ovn.at b/tests/ovn.at
index 877d0dfdba..df3c1f942d 100644
--- a/tests/ovn.at
+++ b/tests/ovn.at
@@ -329,6 +329,9 @@  ct.trk = ct_state[5]
 ct_label = NXM_NX_CT_LABEL
 ct_label.ecmp_reply_eth = ct_label[32..79]
 ct_label.label = ct_label[96..127]
+ct_label.obs_domain_id = ct_label[88..95]
+ct_label.obs_point_id = ct_label[96..127]
+ct_label.obs_unused = ct_label[0..87]
 ct_mark = NXM_NX_CT_MARK
 ct_mark.blocked = ct_mark[0]
 ct_mark.ecmp_reply_port = ct_mark[16..31]
@@ -1355,6 +1358,12 @@  ct_commit(ct_label=18446744073709551615);
 ct_commit(ct_label=18446744073709551616);
     Syntax error at `(' expecting `;'.
 
+# Observation domain and point id.
+ct_commit {ct_label.obs_domain_id = 42; ct_label.obs_point_id = reg2; };
+    formats as ct_commit { ct_label.obs_domain_id = 42; ct_label.obs_point_id = reg2; };
+    encodes as ct(commit,zone=NXM_NX_REG13[[0..15]],exec(set_field:0x2a0000000000000000000000/0xff0000000000000000000000->ct_label,move:NXM_NX_XXREG0[[32..63]]->NXM_NX_CT_LABEL[[96..127]]))
+    has prereqs ip
+
 ct_mark = 12345
     Field ct_mark is not modifiable.
 ct_mark.blocked = 1/1
diff --git a/tests/system-common-macros.at b/tests/system-common-macros.at
index 691c271a3a..c595561734 100644
--- a/tests/system-common-macros.at
+++ b/tests/system-common-macros.at
@@ -237,6 +237,17 @@  m4_define([STRIP_MONITOR_CSUM], [grep "csum:" | sed 's/csum:.*/csum: <skip>/'])
 m4_define([FORMAT_CT],
     [[grep -F "dst=$1," | sed -e 's/port=[0-9]*/port=<cleared>/g' -e 's/id=[0-9]*/id=<cleared>/g' -e 's/state=[0-9_A-Z]*/state=<cleared>/g' | sort | uniq]])
 
+# DAEMONIZE([command], [pidfile])
+#
+# Run 'command' as a background process and record its pid to 'pidfile' to
+# allow cleanup on exit.
+#
+m4_define([DAEMONIZE],
+   [$1 & echo $! > $2
+     echo "kill \`cat $2\`" >> cleanup
+   ]
+)
+
 # NETNS_DAEMONIZE([namespace], [command], [pidfile])
 #
 # Run 'command' as a background process within 'namespace' and record its pid
diff --git a/tests/system-ovn.at b/tests/system-ovn.at
index ddb3d14e92..14f12524c0 100644
--- a/tests/system-ovn.at
+++ b/tests/system-ovn.at
@@ -13022,3 +13022,152 @@  OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port patch-.*/d
 /connection dropped.*/d"])
 AT_CLEANUP
 ])
+
+OVN_FOR_EACH_NORTHD([
+AT_SETUP([ovn -- ACL Sampling])
+AT_SKIP_IF([test $HAVE_TCPDUMP = no])
+AT_SKIP_IF([test $HAVE_NFCAPD = no])
+AT_SKIP_IF([test $HAVE_NFDUMP = no])
+AT_KEYWORDS([ACL])
+
+CHECK_CONNTRACK()
+CHECK_CONNTRACK_NAT()
+ovn_start
+OVS_TRAFFIC_VSWITCHD_START()
+ADD_BR([br-int])
+
+dnl Set external-ids in br-int needed for ovn-controller
+check ovs-vsctl \
+        -- set Open_vSwitch . external-ids:system-id=hv1 \
+        -- set Open_vSwitch . external-ids:ovn-remote=unix:$ovs_base/ovn-sb/ovn-sb.sock \
+        -- set Open_vSwitch . external-ids:ovn-encap-type=geneve \
+        -- set Open_vSwitch . external-ids:ovn-encap-ip=169.0.0.1 \
+        -- set bridge br-int fail-mode=secure other-config:disable-in-band=true
+
+dnl Start ovn-controller
+start_daemon ovn-controller
+
+dnl Logical network:
+dnl 1 logical switch connetected to one logical router
+dnl 3 UDP load balancers (ports 1000, 2000, 3000)
+dnl 2 VIFs
+
+check ovn-nbctl                                                  \
+    -- lr-add rtr                                                \
+    -- lrp-add rtr rtr-ls 00:00:00:00:01:00 42.42.42.1/24        \
+    -- ls-add ls                                                 \
+    -- lsp-add ls ls-rtr                                         \
+    -- lsp-set-addresses ls-rtr 00:00:00:00:01:00                \
+    -- lsp-set-type ls-rtr router                                \
+    -- lsp-set-options ls-rtr router-port=rtr-ls                 \
+    -- lsp-add ls vm1 -- lsp-set-addresses vm1 00:00:00:00:00:01 \
+    -- lsp-add ls vm2 -- lsp-set-addresses vm2 00:00:00:00:00:02 \
+    -- lb-add lb1 43.43.43.43:1000 42.42.42.3:1000 udp           \
+    -- lb-add lb2 43.43.43.43:2000 42.42.42.3:2000 udp           \
+    -- lb-add lb3 43.43.43.43:3000 42.42.42.3:3000 udp           \
+    -- ls-lb-add ls lb1                                          \
+    -- ls-lb-add ls lb2                                          \
+    -- ls-lb-add ls lb3
+
+ADD_NAMESPACES(vm1)
+ADD_VETH(vm1, vm1, br-int, "42.42.42.2/24", "00:00:00:00:00:01", "42.42.42.1")
+
+ADD_NAMESPACES(vm2)
+ADD_VETH(vm2, vm2, br-int, "42.42.42.3/24", "00:00:00:00:00:02", "42.42.42.1")
+
+collector1=$(ovn-nbctl create Sample_Collector name=test-collector1 probability=65535 set_id=1)
+collector2=$(ovn-nbctl create Sample_Collector name=test-collector2 probability=65535 set_id=2)
+check_row_count nb:Sample_Collector 2
+
+ovn-nbctl create Sampling_App name="acl-new-traffic-sampling" id="42"
+ovn-nbctl create Sampling_App name="acl-est-traffic-sampling" id="43"
+check_row_count nb:Sampling_App 2
+
+sample1=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=1001)
+sample2=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=1002)
+sample3=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=2001)
+sample4=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=2002)
+sample5=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=3001)
+sample6=$(ovn-nbctl create Sample collector="$collector1 $collector2" metadata=3002)
+check_row_count nb:Sample 6
+
+dnl Create ACLs that match the 3 types of traffic in all 3 possible stages:
+dnl from-lport, from-lport-after-lb, to-lport
+check ovn-nbctl --sample-new=$sample1 --sample-est=$sample2 acl-add ls from-lport 1 "inport == \"vm1\" && udp.dst == 1000" allow-related
+check ovn-nbctl --apply-after-lb --sample-new=$sample3 --sample-est=$sample4 acl-add ls from-lport 1 "inport == \"vm1\" && udp.dst == 2000" allow-related
+check ovn-nbctl --sample-new=$sample5 --sample-est=$sample6 acl-add ls to-lport 1 "outport == \"vm2\" && udp.dst == 3000" allow-related
+
+dnl Wait for ovn-controller to catch up.
+wait_for_ports_up
+check ovn-nbctl --wait=hv sync
+
+dnl Start an IPFIX collector.
+DAEMONIZE([nfcapd -B 1024000 -w . -p 4242 2> collector.err], [collector.pid])
+
+dnl Wait for the collector to be up.
+OVS_WAIT_UNTIL([grep -q 'Startup nfcapd.' collector.err])
+
+dnl Configure the OVS flow sample collector.
+ovs-vsctl --id=@br get Bridge br-int \
+    -- --id=@ipfix create IPFIX targets=\"127.0.0.1:4242\" template_interval=1 \
+    -- --id=@cs create Flow_Sample_Collector_Set id=1 bridge=@br ipfix=@ipfix
+
+dnl And wait for it to be up and running.
+OVS_WAIT_UNTIL([ovs-ofctl dump-ipfix-flow br-int | grep -q '1 ids'])
+
+dnl Start UDP echo server on vm2.
+NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 1000], [nc-vm2-1000.pid])
+NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 2000], [nc-vm2-2000.pid])
+NETNS_DAEMONIZE([vm2], [nc -e /bin/cat -k -u -v -l 3000], [nc-vm2-3000.pid])
+
+dnl Send traffic to the UDP LB1 (hits the from-lport ACL).
+NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 1000])
+
+dnl Send traffic to the UDP LB1 (hits the from-lport after-lb ACL).
+NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 2000])
+
+dnl Send traffic to the UDP LB1 (hits the from-lport ACL).
+NS_CHECK_EXEC([vm1], [echo a | nc --send-only -u 43.43.43.43 3000])
+
+dnl Wait until OVS sampled all expected packets (2 data packets + 1 ICMP
+dnl port unreachable error on each session).
+OVS_WAIT_UNTIL([ovs-ofctl dump-ipfix-flow br-int | grep -q 'sampled pkts=9'])
+
+dnl Check the IPFIX samples.
+kill $(cat collector.pid)
+OVS_WAIT_WHILE([kill -0 $(cat collector.pid) 2>/dev/null])
+
+dnl Can't match on observation domain ID due to the followig fix not being
+dnl available in any released version of nfdump:
+dnl https://github.com/phaag/nfdump/issues/544
+dnl
+dnl Only match on the point ID.
+AT_CHECK([nfdump -r nfcapd.* -o json | grep observationPointID | awk '{$1=$1;print}' | sort], [0], [dnl
+"observationPointID" : 1001,
+"observationPointID" : 1002,
+"observationPointID" : 1002,
+"observationPointID" : 2001,
+"observationPointID" : 2002,
+"observationPointID" : 2002,
+"observationPointID" : 3001,
+"observationPointID" : 3002,
+"observationPointID" : 3002,
+])
+
+OVS_APP_EXIT_AND_WAIT([ovn-controller])
+
+as ovn-sb
+OVS_APP_EXIT_AND_WAIT([ovsdb-server])
+
+as ovn-nb
+OVS_APP_EXIT_AND_WAIT([ovsdb-server])
+
+as northd
+OVS_APP_EXIT_AND_WAIT([ovn-northd])
+
+as
+OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port patch-.*/d
+/connection dropped.*/d"])
+
+AT_CLEANUP
+])
diff --git a/utilities/containers/fedora/Dockerfile b/utilities/containers/fedora/Dockerfile
index 078180cff3..4dce1e32b4 100755
--- a/utilities/containers/fedora/Dockerfile
+++ b/utilities/containers/fedora/Dockerfile
@@ -27,6 +27,7 @@  RUN dnf -y update \
         libcap-ng-devel \
         libtool \
         net-tools \
+        nfdump \
         ninja-build \
         nmap-ncat \
         numactl-devel \
diff --git a/utilities/containers/ubuntu/Dockerfile b/utilities/containers/ubuntu/Dockerfile
index 7cf0751225..073afa8764 100755
--- a/utilities/containers/ubuntu/Dockerfile
+++ b/utilities/containers/ubuntu/Dockerfile
@@ -33,6 +33,7 @@  RUN apt update -y \
         llvm-dev \
         ncat \
         net-tools \
+        nfdump \
         ninja-build \
         python3-dev \
         python3-pip \
diff --git a/utilities/ovn-nbctl.8.xml b/utilities/ovn-nbctl.8.xml
index e2657ca02c..e1e5b681e1 100644
--- a/utilities/ovn-nbctl.8.xml
+++ b/utilities/ovn-nbctl.8.xml
@@ -399,7 +399,7 @@ 
       must be either <code>switch</code> or <code>port-group</code>.
     </p>
     <dl>
-        <dt>[<code>--type=</code>{<code>switch</code> | <code>port-group</code>}] [<code>--log</code>] [<code>--meter=</code><var>meter</var>] [<code>--severity=</code><var>severity</var>] [<code>--name=</code><var>name</var>] [<code>--label=</code><var>label</var>] [<code>--may-exist</code>] [<code>--apply-after-lb</code>] [<code>--tier</code>] <code>acl-add</code> <var>entity</var> <var>direction</var> <var>priority</var> <var>match</var> <var>verdict</var></dt>
+        <dt>[<code>--type=</code>{<code>switch</code> | <code>port-group</code>}] [<code>--log</code>] [<code>--meter=</code><var>meter</var>] [<code>--severity=</code><var>severity</var>] [<code>--name=</code><var>name</var>] [<code>--label=</code><var>label</var>] [<code>--sample-new=</code><var>sample</var>] [<code>--sample-est=</code><var>sample</var>] [<code>--may-exist</code>] [<code>--apply-after-lb</code>] [<code>--tier</code>] <code>acl-add</code> <var>entity</var> <var>direction</var> <var>priority</var> <var>match</var> <var>verdict</var></dt>
       <dd>
         <p>
           Adds the specified ACL to <var>entity</var>.  <var>direction</var>
@@ -424,6 +424,12 @@ 
           names a meter configured by <code>meter-add</code>.
         </p>
 
+        <p>
+          The <code>--sample-new</code> (and optionally
+          <code>--sample-est</code>) enable ACL sampling. A valid uuid of a
+          row of the <ref table="Sample"/> table must be provided.
+        </p>
+
         <p>
           The <code>--apply-after-lb</code> option sets
           <code>apply-after-lb=true</code> in the <code>options</code> column
diff --git a/utilities/ovn-nbctl.c b/utilities/ovn-nbctl.c
index c37cc010ce..816f26cf4b 100644
--- a/utilities/ovn-nbctl.c
+++ b/utilities/ovn-nbctl.c
@@ -2308,6 +2308,11 @@  nbctl_pre_acl(struct ctl_context *ctx)
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_match);
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_options);
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_tier);
+    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_new);
+    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_est);
+
+    ovsdb_idl_add_table(ctx->idl, &nbrec_table_sample_collector);
+    ovsdb_idl_add_table(ctx->idl, &nbrec_table_sample);
 }
 
 static void
@@ -2321,6 +2326,8 @@  nbctl_pre_acl_list(struct ctl_context *ctx)
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_severity);
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_meter);
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_label);
+    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_new);
+    ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_sample_est);
     ovsdb_idl_add_column(ctx->idl, &nbrec_acl_col_options);
 }
 
@@ -2372,6 +2379,8 @@  nbctl_acl_add(struct ctl_context *ctx)
     const char *severity = shash_find_data(&ctx->options, "--severity");
     const char *name = shash_find_data(&ctx->options, "--name");
     const char *meter = shash_find_data(&ctx->options, "--meter");
+    const char *sample_new = shash_find_data(&ctx->options, "--sample-new");
+    const char *sample_est = shash_find_data(&ctx->options, "--sample-est");
     if (log || severity || name || meter) {
         nbrec_acl_set_log(acl, true);
     }
@@ -2388,6 +2397,38 @@  nbctl_acl_add(struct ctl_context *ctx)
     if (meter) {
         nbrec_acl_set_meter(acl, meter);
     }
+    if (sample_new) {
+        const struct nbrec_sample *sample_elem = NULL;
+        struct uuid sample_uuid;
+
+        if (uuid_from_string(&sample_uuid, sample_new)) {
+            sample_elem = nbrec_sample_get_for_uuid(ctx->idl, &sample_uuid);
+            if (!sample_elem) {
+                ctl_error(ctx, "sample record not found");
+                return;
+            }
+            nbrec_acl_set_sample_new(acl, sample_elem);
+        } else {
+            ctl_error(ctx, "a valid uuid must be provided");
+            return;
+        }
+    }
+    if (sample_est) {
+        const struct nbrec_sample *sample_elem = NULL;
+        struct uuid sample_uuid;
+
+        if (uuid_from_string(&sample_uuid, sample_est)) {
+            sample_elem = nbrec_sample_get_for_uuid(ctx->idl, &sample_uuid);
+            if (!sample_elem) {
+                ctl_error(ctx, "sample record not found");
+                return;
+            }
+            nbrec_acl_set_sample_est(acl, sample_elem);
+        } else {
+            ctl_error(ctx, "a valid uuid must be provided");
+            return;
+        }
+    }
 
     /* Set the ACL label */
     const char *label = shash_find_data(&ctx->options, "--label");
@@ -7915,7 +7956,7 @@  static const struct ctl_command_syntax nbctl_commands[] = {
     { "acl-add", 5, 6, "{SWITCH | PORTGROUP} DIRECTION PRIORITY MATCH ACTION",
       nbctl_pre_acl, nbctl_acl_add, NULL,
       "--log,--may-exist,--type=,--name=,--severity=,--meter=,--label=,"
-      "--apply-after-lb,--tier=", RW },
+      "--apply-after-lb,--tier=,--sample-new=,--sample-est=", RW },
     { "acl-del", 1, 4, "{SWITCH | PORTGROUP} [DIRECTION [PRIORITY MATCH]]",
       nbctl_pre_acl, nbctl_acl_del, NULL, "--type=,--tier=", RW },
     { "acl-list", 1, 1, "{SWITCH | PORTGROUP}",