diff mbox series

[ovs-dev,v5,04/13] python: ovs: flowviz: Add file processing infra.

Message ID 20240710170504.2162803-5-amorenoz@redhat.com
State Changes Requested
Headers show
Series Add flow visualization utility. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/intel-ovs-compilation fail test: fail

Commit Message

Adrián Moreno July 10, 2024, 5:04 p.m. UTC
process.py contains a useful base class that processes files. Datapath
flow processing is pmd-thread-aware.

odp.py and ofp.py: contain datapath and openflow subcommand definitions
as well as the first formatting option: json.

Also, this patch adds basic filtering support.

Examples:
$ ovs-ofctl dump-flows br-int | ovs-flowviz openflow json
$ ovs-ofctl dump-flows br-int > flows.txt && ovs-flowviz -i flows.txt openflow json
$ ovs-ofctl appctl dpctl/dump-flows | ovs-flowviz -f 'ct' datapath json
$ ovs-ofctl appctl dpctl/dump-flows > flows.txt && ovs-flowviz -i flows.txt -f 'drop' datapath json

Signed-off-by: Adrian Moreno <amorenoz@redhat.com>
---
 python/automake.mk             |   5 +-
 python/ovs/flowviz/__init__.py |   2 +
 python/ovs/flowviz/main.py     | 102 ++++++++++++-
 python/ovs/flowviz/odp/cli.py  |  34 +++++
 python/ovs/flowviz/ofp/cli.py  |  34 +++++
 python/ovs/flowviz/process.py  | 255 +++++++++++++++++++++++++++++++++
 6 files changed, 430 insertions(+), 2 deletions(-)
 create mode 100644 python/ovs/flowviz/odp/cli.py
 create mode 100644 python/ovs/flowviz/ofp/cli.py
 create mode 100644 python/ovs/flowviz/process.py

Comments

Eelco Chaudron Aug. 16, 2024, 1:27 p.m. UTC | #1
On 10 Jul 2024, at 19:04, Adrian Moreno wrote:

> process.py contains a useful base class that processes files. Datapath
> flow processing is pmd-thread-aware.
>
> odp.py and ofp.py: contain datapath and openflow subcommand definitions
> as well as the first formatting option: json.
>
> Also, this patch adds basic filtering support.
>
> Examples:
> $ ovs-ofctl dump-flows br-int | ovs-flowviz openflow json
> $ ovs-ofctl dump-flows br-int > flows.txt && ovs-flowviz -i flows.txt openflow json
> $ ovs-ofctl appctl dpctl/dump-flows | ovs-flowviz -f 'ct' datapath json
> $ ovs-ofctl appctl dpctl/dump-flows > flows.txt && ovs-flowviz -i flows.txt -f 'drop' datapath json
>
> Signed-off-by: Adrian Moreno <amorenoz@redhat.com>
Thanks for sending the v5, the changes look good to me.

//Eelco

Acked-by: Eelco Chaudron <echaudro@redhat.com>
diff mbox series

Patch

diff --git a/python/automake.mk b/python/automake.mk
index 124032c92..fd5e74081 100644
--- a/python/automake.mk
+++ b/python/automake.mk
@@ -67,8 +67,11 @@  ovs_flowviz = \
 	python/ovs/flowviz/__init__.py \
 	python/ovs/flowviz/main.py \
 	python/ovs/flowviz/odp/__init__.py \
+	python/ovs/flowviz/odp/cli.py \
 	python/ovs/flowviz/ofp/__init__.py \
-	python/ovs/flowviz/ovs-flowviz
+	python/ovs/flowviz/ofp/cli.py \
+	python/ovs/flowviz/ovs-flowviz \
+	python/ovs/flowviz/process.py
 
 
 # These python files are used at build time but not runtime,
diff --git a/python/ovs/flowviz/__init__.py b/python/ovs/flowviz/__init__.py
index e69de29bb..898dba522 100644
--- a/python/ovs/flowviz/__init__.py
+++ b/python/ovs/flowviz/__init__.py
@@ -0,0 +1,2 @@ 
+import ovs.flowviz.ofp.cli  # noqa: F401
+import ovs.flowviz.odp.cli  # noqa: F401
diff --git a/python/ovs/flowviz/main.py b/python/ovs/flowviz/main.py
index f5bf142be..64b0e8a0a 100644
--- a/python/ovs/flowviz/main.py
+++ b/python/ovs/flowviz/main.py
@@ -13,17 +13,64 @@ 
 # limitations under the License.
 
 import click
+import os
+
+from ovs.flow.filter import OFFilter
 
 
 class Options(dict):
     """Options dictionary"""
 
 
+def validate_input(ctx, param, value):
+    """Validate the "-i" option"""
+    result = list()
+    for input_str in value:
+        parts = input_str.strip().split(",")
+        if len(parts) == 2:
+            file_parts = tuple(parts)
+        elif len(parts) == 1:
+            file_parts = tuple(["Filename: " + parts[0], parts[0]])
+        else:
+            raise click.BadParameter(
+                "input filename should have the following format: "
+                "[alias,]FILENAME"
+            )
+
+        if not os.path.isfile(file_parts[1]):
+            raise click.BadParameter(
+                "input filename %s does not exist" % file_parts[1]
+            )
+        result.append(file_parts)
+    return result
+
+
 @click.group(
     context_settings=dict(help_option_names=["-h", "--help"]),
 )
+@click.option(
+    "-i",
+    "--input",
+    "filename",
+    help="Read flows from specified filepath. If not provided, flows will be"
+    " read from stdin. This option can be specified multiple times."
+    " Format [alias,]FILENAME. Where alias is a name that shall be used to"
+    " refer to this FILENAME",
+    multiple=True,
+    type=click.Path(),
+    callback=validate_input,
+)
+@click.option(
+    "-f",
+    "--filter",
+    help="Filter flows that match the filter expression."
+    "Run 'ovs-flowviz filter' for a detailed description of the filtering "
+    "syntax",
+    type=str,
+    show_default=False,
+)
 @click.pass_context
-def maincli(ctx):
+def maincli(ctx, filename, filter):
     """
     OpenvSwitch flow visualization utility.
 
@@ -31,6 +78,59 @@  def maincli(ctx):
     (such as the output of ovs-ofctl dump-flows or ovs-appctl dpctl/dump-flows)
     and prints them in different formats.
     """
+    ctx.obj = Options()
+    ctx.obj["filename"] = filename or None
+    if filter:
+        try:
+            ctx.obj["filter"] = OFFilter(filter)
+        except Exception as e:
+            raise click.BadParameter("Wrong filter syntax: {}".format(e))
+
+
+@maincli.command(hidden=True)
+@click.pass_context
+def filter(ctx):
+    """
+    \b
+    Filter Syntax
+    *************
+
+     [! | not ] {key}[[.subkey]...] [OPERATOR] {value})] [LOGICAL OPERATOR] ...
+
+    \b
+    Comparison operators are:
+        =   equality
+        <   less than
+        >   more than
+        ~=  masking (valid for IP and Ethernet fields)
+
+    \b
+    Logical operators are:
+        !{expr}:  NOT
+        {expr} && {expr}: AND
+        {expr} || {expr}: OR
+
+    \b
+    Matches and flow metadata:
+        To compare against a match or info field, use the field directly, e.g:
+            priority=100
+            n_bytes>10
+        Use simple keywords for flags:
+            tcp and ip_src=192.168.1.1
+    \b
+    Actions:
+        Actions values might be dictionaries, use subkeys to access individual
+        values, e.g:
+            output.port=3
+        Use simple keywords for flags
+            drop
+
+    \b
+    Examples of valid filters.
+        nw_addr~=192.168.1.1 && (tcp_dst=80 || tcp_dst=443)
+        arp=true && !arp_tsa=192.168.1.1
+        n_bytes>0 && drop=true"""
+    click.echo(ctx.command.get_help(ctx))
 
 
 def main():
diff --git a/python/ovs/flowviz/odp/cli.py b/python/ovs/flowviz/odp/cli.py
new file mode 100644
index 000000000..0f9118b80
--- /dev/null
+++ b/python/ovs/flowviz/odp/cli.py
@@ -0,0 +1,34 @@ 
+# Copyright (c) 2023 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import click
+
+from ovs.flowviz.main import maincli
+from ovs.flowviz.process import JSONDatapathProcessor
+
+
+@maincli.group(subcommand_metavar="FORMAT")
+@click.pass_obj
+def datapath(opts):
+    """Process Datapath Flows."""
+    pass
+
+
+@datapath.command()
+@click.pass_obj
+def json(opts):
+    """Print the flows in JSON format."""
+    proc = JSONDatapathProcessor(opts)
+    proc.process()
+    print(proc.json_string())
diff --git a/python/ovs/flowviz/ofp/cli.py b/python/ovs/flowviz/ofp/cli.py
new file mode 100644
index 000000000..6b815044d
--- /dev/null
+++ b/python/ovs/flowviz/ofp/cli.py
@@ -0,0 +1,34 @@ 
+# Copyright (c) 2023 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import click
+
+from ovs.flowviz.main import maincli
+from ovs.flowviz.process import JSONOpenFlowProcessor
+
+
+@maincli.group(subcommand_metavar="FORMAT")
+@click.pass_obj
+def openflow(opts):
+    """Process OpenFlow Flows."""
+    pass
+
+
+@openflow.command()
+@click.pass_obj
+def json(opts):
+    """Print the flows in JSON format."""
+    proc = JSONOpenFlowProcessor(opts)
+    proc.process()
+    print(proc.json_string())
diff --git a/python/ovs/flowviz/process.py b/python/ovs/flowviz/process.py
new file mode 100644
index 000000000..1be74fbff
--- /dev/null
+++ b/python/ovs/flowviz/process.py
@@ -0,0 +1,255 @@ 
+# Copyright (c) 2023 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import json
+import click
+
+from ovs.flow.decoders import FlowEncoder
+from ovs.flow.odp import ODPFlow
+from ovs.flow.ofp import OFPFlow
+
+
+class FileProcessor(object):
+    """Base class for file-based Flow processing. It is able to create flows
+    from strings found in a file (or stdin).
+
+    The process of parsing the flows is extendable in many ways by deriving
+    this class.
+
+    When process() is called, the base class will:
+        - call self.start_file() for each new file that get's processed
+        - call self.start_thread() for each thread (Datapath flow only)
+        - apply the filter defined in opts if provided (can be optionally
+            disabled)
+        - call self.process_flow() for after the flow has been filtered
+        - call self.stop_thread() after the thread block has been processed
+            (Datapath flow only)
+        - call self.stop_file() after the file has been processed entirely
+
+    In the case of stdin, the filename and file alias is 'stdin'.
+
+    Args:
+        opts (dict): Options dictionary
+        flow_type (str): ["ofp", "odp"]
+    """
+
+    def __init__(self, opts, flow_type):
+        self.opts = opts
+        assert flow_type in ["ofp", "odp"]
+        self.flow_type = flow_type
+        self.current_thread = None
+
+    # Methods that must be implemented by derived classes.
+    def init(self):
+        """Called before the flow processing begins."""
+        pass
+
+    def start_file(self, alias, filename):
+        """Called before the processing of a file begins.
+        Args:
+            alias(str): The alias name of the filename
+            filename(str): The filename string
+        """
+        pass
+
+    def start_thread(self, thread):
+        """Called before the processing of a thread block begins.
+        Args:
+            thread(str): The thread name ("main" or "pmd at cpu core $N")
+        """
+        raise NotImplementedError
+
+    def process_flow(self, flow, name):
+        """Called for built flow (after filtering).
+        Args:
+            flow(Flow): The OpenFlow or Datapath flow.
+            name(str): The name of the file from which the flow comes
+        """
+        raise NotImplementedError
+
+    def stop_thread(self, thread):
+        """Called after the processing of a thread ends.
+        Args:
+            thread(str): The thread name ("main" or "pmd at cpu core $N")
+        """
+        raise NotImplementedError
+
+    def stop_file(self, alias, filename):
+        """Called after the processing of a file ends.
+        Args:
+            alias(str): The alias name of the filename
+            filename(str): The filename string
+        """
+        pass
+
+    def end(self):
+        """Called after the processing ends."""
+        pass
+
+    def process_line(self, line, idx):
+        if self.flow_type == "odp":
+            next_thread = self.current_thread
+            if line.startswith("flow-dump from the main thread"):
+                next_thread = "main"
+            elif line.startswith("flow-dump from pmd on cpu core"):
+                next_thread = line.removeprefix("flow-dump from ").strip("\n")
+
+            if next_thread != self.current_thread:
+                if self.current_thread:
+                    self.stop_thread(self.current_thread)
+                self.start_thread(next_thread)
+                self.current_thread = next_thread
+                return None
+
+            return ODPFlow(line, idx)
+
+        elif self.flow_type == "ofp":
+            # Skip strings commonly found in OpenFlow flow dumps.
+            if " reply " in line:
+                return None
+
+            return OFPFlow(line, idx)
+
+    def process(self, do_filter=True):
+        idx = 0
+        filenames = self.opts.get("filename")
+        filt = self.opts.get("filter") if do_filter else None
+        self.init()
+        if filenames:
+            for alias, filename in filenames:
+                try:
+                    with open(filename) as f:
+                        self.start_file(alias, filename)
+                        for line in f:
+                            flow = self.process_line(line, idx)
+                            idx += 1
+                            if not flow or (filt and not filt.evaluate(flow)):
+                                continue
+                            self.process_flow(flow, alias)
+                        if self.current_thread:
+                            self.stop_thread(self.current_thread)
+                        self.stop_file(alias, filename)
+                except IOError as e:
+                    raise click.BadParameter(
+                        "Failed to read from file {} ({}): {}".format(
+                            filename, e.errno, e.strerror
+                        )
+                    )
+        else:
+            data = sys.stdin.read()
+            self.start_file("stdin", "stdin")
+            for line in data.split("\n"):
+                line = line.strip()
+                if line:
+                    flow = self.process_line(line, idx)
+                    idx += 1
+                    if (
+                        not flow
+                        or not getattr(flow, "_sections", None)
+                        or (filt and not filt.evaluate(flow))
+                    ):
+                        continue
+                    self.process_flow(flow, "stdin")
+            if self.current_thread:
+                self.stop_thread(self.current_thread)
+            self.stop_file("stdin", "stdin")
+        self.end()
+
+
+class JSONOpenFlowProcessor(FileProcessor):
+    """A FileProcessor that prints OpenFlow flows in JSON format."""
+
+    def __init__(self, opts):
+        super().__init__(opts, "ofp")
+        self.flows = dict()
+
+    def start_file(self, name, filename):
+        self.flows_list = list()
+
+    def stop_file(self, name, filename):
+        self.flows[name] = self.flows_list
+
+    def process_flow(self, flow, name):
+        self.flows_list.append(flow)
+
+    def json_string(self):
+        if len(self.flows.keys()) > 1:
+            return json.dumps(
+                [
+                    {"name": name, "flows": [flow.dict() for flow in flows]}
+                    for name, flows in self.flows.items()
+                ],
+                indent=4,
+                cls=FlowEncoder,
+            )
+        return json.dumps(
+            [flow.dict() for flow in self.flows_list],
+            indent=4,
+            cls=FlowEncoder,
+        )
+
+
+class JSONDatapathProcessor(FileProcessor):
+    """A FileProcessor that prints Datapath flows in JSON format."""
+
+    def __init__(self, opts):
+        super().__init__(opts, "odp")
+        self.data = {}
+        self.thread = None
+        self.file = None
+
+    def start_file(self, name, filename):
+        self.per_thread_flows = None
+        self.flows_list = []
+
+    def start_thread(self, name):
+        if not self.per_thread_flows:
+            self.per_thread_flows = {}
+
+    def stop_thread(self, name):
+        self.per_thread_flows[name] = self.flows_list
+
+    def stop_file(self, name, filename):
+        if self.per_thread_flows:
+            self.data[name] = self.per_thread_flows
+        else:
+            self.data[name] = self.flows_list
+
+    def process_flow(self, flow, name):
+        self.flows_list.append(flow)
+
+    def json_string(self):
+        opts = {
+            "indent": 4,
+            "cls": FlowEncoder,
+        }
+
+        def thread_data(data):
+            if isinstance(data, dict):
+                return {
+                    thread: [flow.dict() for flow in flows]
+                    for thread, flows in data.items()
+                }
+            return [flow.dict() for flow in data]
+
+        if len(self.data.keys()) > 1:
+            jsondata = {}
+            for file, file_data in self.data.items():
+                jsondata[file] = thread_data(file_data)
+            return json.dumps(jsondata, **opts)
+        else:
+            return json.dumps(
+                thread_data(next(iter(self.data.values()))), **opts
+            )